diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index dee387eb7f..695f1a7375 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -24,7 +24,6 @@ from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
-from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
@@ -183,15 +182,12 @@ class TransactionQueue(object):
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
- users_in_room = yield self.state.get_current_user_in_room(
+ destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- )
if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index cb13874ccf..72820ec42a 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -19,17 +19,21 @@ from twisted.internet import defer
from .push_rule_evaluator import PushRuleEvaluatorForEvent
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
logger = logging.getLogger(__name__)
+rules_by_room = {}
+
+
@defer.inlineCallbacks
def evaluator_for_event(event, hs, store, context):
- rules_by_user = yield store.bulk_get_push_rules_for_room(
- event, context
- )
+ room_id = event.room_id
+ rules_for_room = rules_by_room.setdefault(room_id, RulesForRoom(hs, room_id))
+
+ rules_by_user = yield rules_for_room.get_rules(context)
# if this event is an invite event, we may need to run rules for the user
# who's been invited, otherwise they won't get told they've been invited
@@ -66,11 +70,14 @@ class BulkPushRuleEvaluator:
def action_for_event_by_user(self, event, context):
actions_by_user = {}
- room_members = yield self.store.get_joined_users_from_context(
- event, context
- )
+ # room_members = yield self.store.get_joined_users_from_context(
+ # event, context
+ # )
+ room_members = {}
+
+ num_room_members = yield self.store.get_number_of_users_in_rooms(event.room_id)
- evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
+ evaluator = PushRuleEvaluatorForEvent(event, num_room_members)
condition_cache = {}
@@ -127,3 +134,128 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache):
return False
return True
+
+
+class RulesForRoom(object):
+ def __init__(self, hs, room_id):
+ self.room_id = room_id
+ self.is_mine_id = hs.is_mine_id
+ self.store = hs.get_datastore()
+
+ self.member_map = {} # event_id -> (user_id, state)
+ self.rules_by_user = {} # user_id -> rules
+ self.state_group = object()
+
+ self.sequence = 0
+
+ @defer.inlineCallbacks
+ def get_rules(self, context):
+ state_group = context.state_group
+ current_state_ids = context.current_state_ids
+
+ if state_group and self.state_group == state_group:
+ defer.returnValue(self.rules_by_user)
+
+ ret_rules_by_user = {}
+ missing_member_event_ids = {}
+ for key, event_id in current_state_ids.iteritems():
+ res = self.member_map.get(event_id, None)
+ if res:
+ user_id, state = res
+ if state == Membership.JOIN:
+ rules = self.rules_by_user.get(user_id, None)
+ if rules:
+ ret_rules_by_user[user_id] = rules
+ continue
+
+ if key[0] != EventTypes.Member:
+ continue
+
+ user_id = key[1]
+ if not self.is_mine_id(user_id):
+ continue
+
+ if self.store.get_if_app_services_interested_in_user(user_id):
+ continue
+
+ missing_member_event_ids[user_id] = event_id
+
+ if missing_member_event_ids:
+ missing_rules = yield self.get_rules_for_member_event_ids(
+ missing_member_event_ids, state_group
+ )
+ ret_rules_by_user.update(missing_rules)
+
+ defer.returnValue(ret_rules_by_user)
+
+ @defer.inlineCallbacks
+ def get_rules_for_member_event_ids(self, member_event_ids, state_group):
+ sequence = self.sequence
+
+ rows = yield self.store._simple_select_many_batch(
+ table="room_memberships",
+ column="event_id",
+ iterable=member_event_ids.values(),
+ retcols=['user_id', 'membership', 'event_id'],
+ keyvalues={},
+ batch_size=500,
+ desc="get_rules_for_member_event_ids",
+ )
+
+ members = {
+ row["event_id"]: (row["user_id"], row["membership"])
+ for row in rows
+ }
+
+ interested_in_user_ids = set(user_id for user_id, _ in members.itervalues())
+
+ if_users_with_pushers = yield self.store.get_if_users_have_pushers(
+ interested_in_user_ids,
+ on_invalidate=self.invalidate_all,
+ )
+
+ user_ids = set(
+ uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
+ )
+
+ users_with_receipts = yield self.store.get_users_with_read_receipts_in_room(
+ self.room_id, on_invalidate=self.invalidate_all,
+ )
+
+ # any users with pushers must be ours: they have pushers
+ for uid in users_with_receipts:
+ if uid in interested_in_user_ids:
+ user_ids.add(uid)
+
+ forgotten = yield self.store.who_forgot_in_room(
+ self.room_id, on_invalidate=self.invalidate_all,
+ )
+
+ for row in forgotten:
+ user_id = row["user_id"]
+ event_id = row["event_id"]
+
+ mem_id = member_event_ids.get((user_id), None)
+ if event_id == mem_id:
+ user_ids.discard(user_id)
+
+ rules_by_user = yield self.store.bulk_get_push_rules(
+ user_ids, on_invalidate=self.invalidate_all,
+ )
+
+ rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None}
+
+ self.update_cache(sequence, members, rules_by_user, state_group)
+
+ defer.returnValue(rules_by_user)
+
+ def invalidate_all(self):
+ self.sequence += 1
+ self.member_map = {}
+ self.rules_by_user = {}
+
+ def update_cache(self, sequence, members, rules_by_user, state_group):
+ if sequence == self.sequence:
+ self.member_map.update(members)
+ self.rules_by_user.update(rules_by_user)
+ self.state_group = state_group
diff --git a/synapse/state.py b/synapse/state.py
index f6b83d888a..f8b18a4a2d 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -176,6 +176,17 @@ class StateHandler(object):
defer.returnValue(joined_users)
@defer.inlineCallbacks
+ def get_current_hosts_in_room(self, room_id, latest_event_ids=None):
+ if not latest_event_ids:
+ latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+ logger.debug("calling resolve_state_groups from get_current_user_in_room")
+ entry = yield self.resolve_state_groups(room_id, latest_event_ids)
+ joined_hosts = yield self.store.get_joined_hosts(
+ room_id, entry.state_id, entry.state
+ )
+ defer.returnValue(joined_hosts)
+
+ @defer.inlineCallbacks
def compute_event_context(self, event, old_state=None):
"""Build an EventContext structure for the event.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c659004e8d..58b73af7d2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -60,12 +60,12 @@ class LoggingTransaction(object):
object.__setattr__(self, "database_engine", database_engine)
object.__setattr__(self, "after_callbacks", after_callbacks)
- def call_after(self, callback, *args):
+ def call_after(self, callback, *args, **kwargs):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
"""
- self.after_callbacks.append((callback, args))
+ self.after_callbacks.append((callback, args, kwargs))
def __getattr__(self, name):
return getattr(self.txn, name)
@@ -319,8 +319,8 @@ class SQLBaseStore(object):
inner_func, *args, **kwargs
)
finally:
- for after_callback, after_args in after_callbacks:
- after_callback(*after_args)
+ for after_callback, after_args, after_kwargs in after_callbacks:
+ after_callback(*after_args, **after_kwargs)
defer.returnValue(result)
@defer.inlineCallbacks
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a3790419dd..a8d1b93d99 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -370,6 +370,10 @@ class EventsStore(SQLBaseStore):
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
+ for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+ self.get_current_state_ids.prefill(
+ (room_id, ), new_state
+ )
@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
@@ -529,7 +533,7 @@ class EventsStore(SQLBaseStore):
if ev_id in events_to_insert
}
- defer.returnValue((to_delete, to_insert))
+ defer.returnValue((to_delete, to_insert, current_state))
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
@@ -682,7 +686,7 @@ class EventsStore(SQLBaseStore):
def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
- to_delete, to_insert = current_state_tuple
+ to_delete, to_insert, _ = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 353a135c4e..b80af941f2 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -48,7 +48,7 @@ def _load_rules(rawrules, enabled_map):
class PushRuleStore(SQLBaseStore):
- @cachedInlineCallbacks()
+ @cachedInlineCallbacks(max_entries=50000)
def get_push_rules_for_user(self, user_id):
rows = yield self._simple_select_list(
table="push_rules",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7ad2198d96..bfd1955376 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -417,25 +417,47 @@ class RoomMemberStore(SQLBaseStore):
if key[0] == EventTypes.Member
]
- rows = yield self._simple_select_many_batch(
- table="room_memberships",
- column="event_id",
- iterable=member_event_ids,
- retcols=['user_id', 'display_name', 'avatar_url'],
- keyvalues={
- "membership": Membership.JOIN,
- },
- batch_size=500,
- desc="_get_joined_users_from_context",
+ event_map = self._get_events_from_cache(
+ member_event_ids,
+ allow_rejected=False,
)
- users_in_room = {
- to_ascii(row["user_id"]): ProfileInfo(
- avatar_url=to_ascii(row["avatar_url"]),
- display_name=to_ascii(row["display_name"]),
+ missing_member_event_ids = []
+ users_in_room = {}
+ for event_id, ev_entry in event_map.iteritems():
+ if event_id:
+ if ev_entry.event.membership == Membership.JOIN:
+ users_in_room[to_ascii(ev_entry.event.state_key)] = ProfileInfo(
+ display_name=to_ascii(
+ ev_entry.event.content.get("displayname", None)
+ ),
+ avatar_url=to_ascii(
+ ev_entry.event.content.get("avatar_url", None)
+ ),
+ )
+ else:
+ missing_member_event_ids.append(event_id)
+
+ if missing_member_event_ids:
+ rows = yield self._simple_select_many_batch(
+ table="room_memberships",
+ column="event_id",
+ iterable=member_event_ids,
+ retcols=('user_id', 'display_name', 'avatar_url',),
+ keyvalues={
+ "membership": Membership.JOIN,
+ },
+ batch_size=500,
+ desc="_get_joined_users_from_context",
)
- for row in rows
- }
+
+ users_in_room.update({
+ to_ascii(row["user_id"]): ProfileInfo(
+ avatar_url=to_ascii(row["avatar_url"]),
+ display_name=to_ascii(row["display_name"]),
+ )
+ for row in rows
+ })
if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -447,6 +469,17 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(users_in_room)
+ def get_number_of_users_in_rooms(self, room_id):
+ sql = """SELECT coalesce(count(*), 0) FROM current_state_events
+ INNER JOIN room_memberships USING (room_id, event_id)
+ WHERE room_id = ? AND membership = 'join'
+ """
+ return self._execute(
+ "get_number_of_users_in_rooms",
+ lambda txn: txn.fetchone()[0],
+ sql, room_id
+ )
+
def is_host_joined(self, room_id, host, state_group, state_ids):
if not state_group:
# If state_group is None it means it has yet to be assigned a
@@ -482,6 +515,44 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(False)
+ def get_joined_hosts(self, room_id, state_group, state_ids):
+ if not state_group:
+ # If state_group is None it means it has yet to be assigned a
+ # state group, i.e. we need to make sure that calls with a state_group
+ # of None don't hit previous cached calls with a None state_group.
+ # To do this we set the state_group to a new object as object() != object()
+ state_group = object()
+
+ return self._get_joined_hosts(
+ room_id, state_group, state_ids
+ )
+
+ @cachedInlineCallbacks(num_args=3)
+ def _get_joined_hosts(self, room_id, state_group, current_state_ids):
+ # We don't use `state_group`, its there so that we can cache based
+ # on it. However, its important that its never None, since two current_state's
+ # with a state_group of None are likely to be different.
+ # See bulk_get_push_rules_for_room for how we work around this.
+ assert state_group is not None
+
+ joined_hosts = set()
+ for (etype, state_key), event_id in current_state_ids.items():
+ if etype == EventTypes.Member:
+ try:
+ host = get_domain_from_id(state_key)
+ except:
+ logger.warn("state_key not user_id: %s", state_key)
+ continue
+
+ if host in joined_hosts:
+ continue
+
+ event = yield self.get_event(event_id, allow_none=True)
+ if event and event.content["membership"] == Membership.JOIN:
+ joined_hosts.add(host)
+
+ defer.returnValue(joined_hosts)
+
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a16afa8df5..1e1ce87e0e 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -227,6 +227,14 @@ class StateStore(SQLBaseStore):
],
)
+ txn.call_after(
+ self._state_group_cache.update,
+ self._state_group_cache.sequence,
+ key=context.state_group,
+ value=context.current_state_ids,
+ full=True,
+ )
+
self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
|