diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e009395207..563bb3cea3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -156,7 +156,7 @@ class InitialSyncHandler(BaseHandler):
room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = run_in_background(
self.store.get_state_for_events,
- [event.event_id], None,
+ [event.event_id],
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -301,7 +301,7 @@ class InitialSyncHandler(BaseHandler):
def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
membership, member_event_id, is_peeking):
room_state = yield self.store.get_state_for_events(
- [member_event_id], None
+ [member_event_id],
)
room_state = room_state[member_event_id]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 6c4fcfb10a..969e588e73 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -35,6 +35,7 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -80,7 +81,7 @@ class MessageHandler(object):
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
- [membership_event_id], [key]
+ [membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
@@ -88,7 +89,7 @@ class MessageHandler(object):
@defer.inlineCallbacks
def get_state_events(
- self, user_id, room_id, types=None, filtered_types=None,
+ self, user_id, room_id, state_filter=StateFilter.all(),
at_token=None, is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
@@ -100,13 +101,8 @@ class MessageHandler(object):
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
@@ -139,7 +135,7 @@ class MessageHandler(object):
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
- [event.event_id], types, filtered_types=filtered_types,
+ [event.event_id], state_filter=state_filter,
)
room_state = room_state[event.event_id]
else:
@@ -158,12 +154,12 @@ class MessageHandler(object):
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
- room_id, types, filtered_types=filtered_types,
+ room_id, state_filter=state_filter,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
- [membership_event_id], types, filtered_types=filtered_types,
+ [membership_event_id], state_filter=state_filter,
)
room_state = room_state[membership_event_id]
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a155b6e938..43f81bd607 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -21,6 +21,7 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
+from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.logcontext import run_in_background
@@ -255,16 +256,14 @@ class PaginationHandler(object):
if event_filter and event_filter.lazy_load_members():
# TODO: remove redundant members
- types = [
- (EventTypes.Member, state_key)
- for state_key in set(
- event.sender # FIXME: we also care about invite targets etc.
- for event in events
- )
- ]
+ # FIXME: we also care about invite targets etc.
+ state_filter = StateFilter.from_types(
+ (EventTypes.Member, event.sender)
+ for event in events
+ )
state_ids = yield self.store.get_state_ids_for_event(
- events[0].event_id, types=types,
+ events[0].event_id, state_filter=state_filter,
)
if state_ids:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ab1571b27b..3ba92bdb4c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -33,6 +33,7 @@ from synapse.api.constants import (
RoomCreationPreset,
)
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@@ -489,23 +490,24 @@ class RoomContextHandler(object):
else:
last_event_id = event_id
- types = None
- filtered_types = None
if event_filter and event_filter.lazy_load_members():
- members = set(ev.sender for ev in itertools.chain(
- results["events_before"],
- (results["event"],),
- results["events_after"],
- ))
- filtered_types = [EventTypes.Member]
- types = [(EventTypes.Member, member) for member in members]
+ state_filter = StateFilter.from_lazy_load_member_list(
+ ev.sender
+ for ev in itertools.chain(
+ results["events_before"],
+ (results["event"],),
+ results["events_after"],
+ )
+ )
+ else:
+ state_filter = StateFilter.all()
# XXX: why do we return the state as of the last event rather than the
# first? Shouldn't we be consistent with /sync?
# https://github.com/matrix-org/matrix-doc/issues/687
state = yield self.store.get_state_for_events(
- [last_event_id], types, filtered_types=filtered_types,
+ [last_event_id], state_filter=state_filter,
)
results["state"] = list(state[last_event_id].values())
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 351892a94f..09739f2862 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -27,6 +27,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
+from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -469,25 +470,20 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event, types=None, filtered_types=None):
+ def get_state_after_event(self, event, state_filter=StateFilter.all()):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
state_ids = yield self.store.get_state_ids_for_event(
- event.event_id, types, filtered_types=filtered_types,
+ event.event_id, state_filter=state_filter,
)
if event.is_state():
state_ids = state_ids.copy()
@@ -495,18 +491,14 @@ class SyncHandler(object):
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
+ def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
@@ -522,7 +514,7 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
state = yield self.get_state_after_event(
- last_event, types, filtered_types=filtered_types,
+ last_event, state_filter=state_filter,
)
else:
@@ -563,10 +555,11 @@ class SyncHandler(object):
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
- last_event.event_id, [
+ last_event.event_id,
+ state_filter=StateFilter.from_types([
(EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''),
- ]
+ ]),
)
# this is heavily cached, thus: fast.
@@ -717,8 +710,7 @@ class SyncHandler(object):
with Measure(self.clock, "compute_state_delta"):
- types = None
- filtered_types = None
+ members_to_fetch = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@@ -729,16 +721,21 @@ class SyncHandler(object):
# We only request state for the members needed to display the
# timeline:
- types = [
- (EventTypes.Member, state_key)
- for state_key in set(
- event.sender # FIXME: we also care about invite targets etc.
- for event in batch.events
- )
- ]
+ members_to_fetch = set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in batch.events
+ )
+
+ if full_state:
+ # always make sure we LL ourselves so we know we're in the room
+ # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+ # We only need apply this on full state syncs given we disabled
+ # LL for incr syncs in #3840.
+ members_to_fetch.add(sync_config.user.to_string())
- # only apply the filtering to room members
- filtered_types = [EventTypes.Member]
+ state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+ else:
+ state_filter = StateFilter.all()
timeline_state = {
(event.type, event.state_key): event.event_id
@@ -746,28 +743,19 @@ class SyncHandler(object):
}
if full_state:
- if lazy_load_members:
- # always make sure we LL ourselves so we know we're in the room
- # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
- # We only need apply this on full state syncs given we disabled
- # LL for incr syncs in #3840.
- types.append((EventTypes.Member, sync_config.user.to_string()))
-
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[-1].event_id, state_filter=state_filter,
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[0].event_id, state_filter=state_filter,
)
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token, types=types,
- filtered_types=filtered_types,
+ room_id, stream_position=now_token,
+ state_filter=state_filter,
)
state_ids = current_state_ids
@@ -781,8 +769,7 @@ class SyncHandler(object):
)
elif batch.limited:
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[0].event_id, state_filter=state_filter,
)
# for now, we disable LL for gappy syncs - see
@@ -797,17 +784,15 @@ class SyncHandler(object):
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
- types = None
- filtered_types = None
+ state_filter = StateFilter.all()
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token, types=types,
- filtered_types=filtered_types,
+ room_id, stream_position=since_token,
+ state_filter=state_filter,
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id, types=types,
- filtered_types=filtered_types,
+ batch.events[-1].event_id, state_filter=state_filter,
)
state_ids = _calculate_state(
@@ -821,7 +806,7 @@ class SyncHandler(object):
else:
state_ids = {}
if lazy_load_members:
- if types and batch.events:
+ if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
@@ -831,8 +816,12 @@ class SyncHandler(object):
# timeline here, and then dedupe any redundant ones below.
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id, types=types,
- filtered_types=None, # we only want members!
+ batch.events[0].event_id,
+ # we only want members!
+ state_filter=StateFilter.from_types(
+ (EventTypes.Member, member)
+ for member in members_to_fetch
+ ),
)
if lazy_load_members and not include_redundant_members:
|