diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a6429d17ed..0a1a8cc1e5 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage
+from synapse.storage.state import StateGroupStorage
__all__ = ["DataStores", "DataStore"]
@@ -45,6 +46,7 @@ class Storage(object):
self.main = stores.main
self.persistence = EventsPersistenceStorage(hs, stores)
+ self.state = StateGroupStorage(hs, stores)
def are_all_users_on_domain(txn, database_engine, domain):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index f5906fcd54..1a2b7ebe25 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -494,7 +494,7 @@ class SQLBaseStore(object):
exception_callbacks = []
if LoggingContext.current_context() == LoggingContext.sentinel:
- logger.warn("Starting db txn '%s' from sentinel context", desc)
+ logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
result = yield self.runWithConnection(
@@ -532,7 +532,7 @@ class SQLBaseStore(object):
"""
parent_context = LoggingContext.current_context()
if parent_context == LoggingContext.sentinel:
- logger.warn(
+ logger.warning(
"Starting db connection from sentinel context: metrics will be lost"
)
parent_context = None
@@ -719,7 +719,7 @@ class SQLBaseStore(object):
raise
# presumably we raced with another transaction: let's retry.
- logger.warn(
+ logger.warning(
"IntegrityError when upserting into %s; retrying: %s", table, e
)
diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py
index 2d8814f4a7..10c940df1e 100644
--- a/synapse/storage/data_stores/main/__init__.py
+++ b/synapse/storage/data_stores/main/__init__.py
@@ -320,7 +320,7 @@ class DataStore(
) u
"""
txn.execute(sql, (time_from,))
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return count
def count_r30_users(self):
@@ -399,7 +399,7 @@ class DataStore(
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
results["all"] = count
return results
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 22025effbc..04ce21ac66 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -863,7 +863,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
stream_row = txn.fetchone()
if stream_row:
- offset_stream_ordering, = stream_row
+ (offset_stream_ordering,) = stream_row
rotate_to_stream_ordering = min(
self.stream_ordering_day_ago, offset_stream_ordering
)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 7c3607f308..aafc2007d3 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -82,7 +82,7 @@ def _retry_on_integrity_error(func):
@defer.inlineCallbacks
def f(self, *args, **kwargs):
try:
- res = yield func(self, *args, **kwargs)
+ res = yield func(self, *args, delete_existing=False, **kwargs)
except self.database_engine.module.IntegrityError:
logger.exception("IntegrityError, retrying.")
res = yield func(self, *args, delete_existing=True, **kwargs)
@@ -1125,7 +1125,7 @@ class EventsStore(
AND stream_ordering > ?
"""
txn.execute(sql, (self.stream_ordering_day_ago,))
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return count
ret = yield self.runInteraction("count_messages", _count_messages)
@@ -1146,7 +1146,7 @@ class EventsStore(
"""
txn.execute(sql, (like_clause, self.stream_ordering_day_ago))
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return count
ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
@@ -1161,7 +1161,7 @@ class EventsStore(
AND stream_ordering > ?
"""
txn.execute(sql, (self.stream_ordering_day_ago,))
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return count
ret = yield self.runInteraction("count_daily_active_rooms", _count)
@@ -1646,7 +1646,7 @@ class EventsStore(
""",
(room_id,),
)
- min_depth, = txn.fetchone()
+ (min_depth,) = txn.fetchone()
logger.info("[purge] updating room_depth to %d", min_depth)
@@ -1838,7 +1838,6 @@ class EventsStore(
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
- "topics",
"users_in_public_rooms",
"users_who_share_private_rooms",
# no useful index, but let's clear them anyway
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 31ea6f917f..51352b9966 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -438,7 +438,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
if not rows:
return 0
- upper_event_id, = rows[-1]
+ (upper_event_id,) = rows[-1]
# Update the redactions with the received_ts.
#
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index aeae5a2b28..b3a2771f1b 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -249,7 +249,7 @@ class GroupServerStore(SQLBaseStore):
WHERE group_id = ? AND category_id = ?
"""
txn.execute(sql, (group_id, category_id))
- order, = txn.fetchone()
+ (order,) = txn.fetchone()
if existing:
to_update = {}
@@ -509,7 +509,7 @@ class GroupServerStore(SQLBaseStore):
WHERE group_id = ? AND role_id = ?
"""
txn.execute(sql, (group_id, role_id))
- order, = txn.fetchone()
+ (order,) = txn.fetchone()
if existing:
to_update = {}
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index e6ee1e4aaa..b41c3d317a 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -171,7 +171,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
txn.execute(sql)
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return count
return self.runInteraction("count_users", _count_users)
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index cd95f1ce60..b520062d84 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -143,7 +143,7 @@ class PushRulesWorkerStore(
" WHERE user_id = ? AND ? < stream_id"
)
txn.execute(sql, (user_id, last_id))
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return bool(count)
return self.runInteraction(
diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py
index f005c1ae0a..d76861cdc0 100644
--- a/synapse/storage/data_stores/main/pusher.py
+++ b/synapse/storage/data_stores/main/pusher.py
@@ -44,7 +44,7 @@ class PusherWorkerStore(SQLBaseStore):
r["data"] = json.loads(dataJson)
except Exception as e:
- logger.warn(
+ logger.warning(
"Invalid JSON in data for pusher %d: %s, %s",
r["id"],
dataJson,
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 6c5b29288a..f70d41ecab 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -459,7 +459,7 @@ class RegistrationWorkerStore(SQLBaseStore):
WHERE appservice_id IS NULL
"""
)
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return count
ret = yield self.runInteraction("count_users", _count_users)
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index bc04bfd7d4..2af24a20b7 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -927,7 +927,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
if not row or not row[0]:
return processed, True
- next_room, = row
+ (next_room,) = row
sql = """
UPDATE current_state_events
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 0e08497452..d1d7c6863d 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -196,7 +196,7 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
" ON event_search USING GIN (vector)"
)
except psycopg2.ProgrammingError as e:
- logger.warn(
+ logger.warning(
"Ignoring error %r when trying to switch from GIST to GIN", e
)
@@ -672,7 +672,7 @@ class SearchStore(SearchBackgroundUpdateStore):
)
)
txn.execute(query, (value, search_query))
- headline, = txn.fetchall()[0]
+ (headline,) = txn.fetchall()[0]
# Now we need to pick the possible highlights out of the haedline
# result.
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 9b2207075b..3132848034 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -725,16 +725,18 @@ class StateGroupWorkerStore(
member_filter, non_member_filter = state_filter.get_member_split()
# Now we look them up in the member and non-member caches
- non_member_state, incomplete_groups_nm, = (
- yield self._get_state_for_groups_using_cache(
- groups, self._state_group_cache, state_filter=non_member_filter
- )
+ (
+ non_member_state,
+ incomplete_groups_nm,
+ ) = yield self._get_state_for_groups_using_cache(
+ groups, self._state_group_cache, state_filter=non_member_filter
)
- member_state, incomplete_groups_m, = (
- yield self._get_state_for_groups_using_cache(
- groups, self._state_group_members_cache, state_filter=member_filter
- )
+ (
+ member_state,
+ incomplete_groups_m,
+ ) = yield self._get_state_for_groups_using_cache(
+ groups, self._state_group_members_cache, state_filter=member_filter
)
state = dict(non_member_state)
@@ -1076,7 +1078,7 @@ class StateBackgroundUpdateStore(
" WHERE id < ? AND room_id = ?",
(state_group, room_id),
)
- prev_group, = txn.fetchone()
+ (prev_group,) = txn.fetchone()
new_last_state_group = state_group
if prev_group:
diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 4d59b7833f..45b3de7d56 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -773,7 +773,7 @@ class StatsStore(StateDeltasStore):
(room_id,),
)
- current_state_events_count, = txn.fetchone()
+ (current_state_events_count,) = txn.fetchone()
users_in_room = self.get_users_in_room_txn(txn, room_id)
@@ -863,7 +863,7 @@ class StatsStore(StateDeltasStore):
""",
(user_id,),
)
- count, = txn.fetchone()
+ (count,) = txn.fetchone()
return count, pos
joined_rooms, pos = yield self.runInteraction(
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index cf66225574..fa03ca9ff7 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -260,9 +260,7 @@ class EventsPersistenceStorage(object):
self._event_persist_queue.handle_queue(room_id, persisting_queue)
@defer.inlineCallbacks
- def _persist_events(
- self, events_and_contexts, backfilled=False, delete_existing=False
- ):
+ def _persist_events(self, events_and_contexts, backfilled=False):
"""Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
@@ -412,7 +410,6 @@ class EventsPersistenceStorage(object):
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
backfilled=backfilled,
- delete_existing=delete_existing,
)
@defer.inlineCallbacks
@@ -550,7 +547,7 @@ class EventsPersistenceStorage(object):
if missing_event_ids:
# Now pull out the state groups for any missing events from DB
- event_to_groups = yield self.state_store._get_state_group_for_events(
+ event_to_groups = yield self.main_store._get_state_group_for_events(
missing_event_ids
)
event_id_to_state_group.update(event_to_groups)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a2df8fa827..3735846899 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -19,6 +19,8 @@ from six import iteritems, itervalues
import attr
+from twisted.internet import defer
+
from synapse.api.constants import EventTypes
logger = logging.getLogger(__name__)
@@ -322,3 +324,234 @@ class StateFilter(object):
)
return member_filter, non_member_filter
+
+
+class StateGroupStorage(object):
+ """High level interface to fetching state for event.
+ """
+
+ def __init__(self, hs, stores):
+ self.stores = stores
+
+ def get_state_group_delta(self, state_group):
+ """Given a state group try to return a previous group and a delta between
+ the old and the new.
+
+ Returns:
+ Deferred[Tuple[Optional[int], Optional[list[dict[tuple[str, str], str]]]]]):
+ (prev_group, delta_ids)
+ """
+
+ return self.stores.main.get_state_group_delta(state_group)
+
+ @defer.inlineCallbacks
+ def get_state_groups_ids(self, _room_id, event_ids):
+ """Get the event IDs of all the state for the state groups for the given events
+
+ Args:
+ _room_id (str): id of the room for these events
+ event_ids (iterable[str]): ids of the events
+
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ """
+ if not event_ids:
+ return {}
+
+ event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
+
+ groups = set(itervalues(event_to_groups))
+ group_to_state = yield self.stores.main._get_state_for_groups(groups)
+
+ return group_to_state
+
+ @defer.inlineCallbacks
+ def get_state_ids_for_group(self, state_group):
+ """Get the event IDs of all the state in the given state group
+
+ Args:
+ state_group (int)
+
+ Returns:
+ Deferred[dict]: Resolves to a map of (type, state_key) -> event_id
+ """
+ group_to_state = yield self._get_state_for_groups((state_group,))
+
+ return group_to_state[state_group]
+
+ @defer.inlineCallbacks
+ def get_state_groups(self, room_id, event_ids):
+ """ Get the state groups for the given list of event_ids
+ Returns:
+ Deferred[dict[int, list[EventBase]]]:
+ dict of state_group_id -> list of state events.
+ """
+ if not event_ids:
+ return {}
+
+ group_to_ids = yield self.get_state_groups_ids(room_id, event_ids)
+
+ state_event_map = yield self.stores.main.get_events(
+ [
+ ev_id
+ for group_ids in itervalues(group_to_ids)
+ for ev_id in itervalues(group_ids)
+ ],
+ get_prev_content=False,
+ )
+
+ return {
+ group: [
+ state_event_map[v]
+ for v in itervalues(event_id_map)
+ if v in state_event_map
+ ]
+ for group, event_id_map in iteritems(group_to_ids)
+ }
+
+ def _get_state_groups_from_groups(self, groups, state_filter):
+ """Returns the state groups for a given set of groups, filtering on
+ types of state events.
+
+ Args:
+ groups(list[int]): list of state group IDs to query
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ """
+
+ return self.stores.main._get_state_groups_from_groups(groups, state_filter)
+
+ @defer.inlineCallbacks
+ def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
+ """Given a list of event_ids and type tuples, return a list of state
+ dicts for each event.
+ Args:
+ event_ids (list[string])
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+ Returns:
+ deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
+ """
+ event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
+
+ groups = set(itervalues(event_to_groups))
+ group_to_state = yield self.stores.main._get_state_for_groups(
+ groups, state_filter
+ )
+
+ state_event_map = yield self.stores.main.get_events(
+ [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
+ get_prev_content=False,
+ )
+
+ event_to_state = {
+ event_id: {
+ k: state_event_map[v]
+ for k, v in iteritems(group_to_state[group])
+ if v in state_event_map
+ }
+ for event_id, group in iteritems(event_to_groups)
+ }
+
+ return {event: event_to_state[event] for event in event_ids}
+
+ @defer.inlineCallbacks
+ def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()):
+ """
+ Get the state dicts corresponding to a list of events, containing the event_ids
+ of the state events (as opposed to the events themselves)
+
+ Args:
+ event_ids(list(str)): events whose state should be returned
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+
+ Returns:
+ A deferred dict from event_id -> (type, state_key) -> event_id
+ """
+ event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
+
+ groups = set(itervalues(event_to_groups))
+ group_to_state = yield self.stores.main._get_state_for_groups(
+ groups, state_filter
+ )
+
+ event_to_state = {
+ event_id: group_to_state[group]
+ for event_id, group in iteritems(event_to_groups)
+ }
+
+ return {event: event_to_state[event] for event in event_ids}
+
+ @defer.inlineCallbacks
+ def get_state_for_event(self, event_id, state_filter=StateFilter.all()):
+ """
+ Get the state dict corresponding to a particular event
+
+ Args:
+ event_id(str): event whose state should be returned
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+
+ Returns:
+ A deferred dict from (type, state_key) -> state_event
+ """
+ state_map = yield self.get_state_for_events([event_id], state_filter)
+ return state_map[event_id]
+
+ @defer.inlineCallbacks
+ def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()):
+ """
+ Get the state dict corresponding to a particular event
+
+ Args:
+ event_id(str): event whose state should be returned
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+
+ Returns:
+ A deferred dict from (type, state_key) -> state_event
+ """
+ state_map = yield self.get_state_ids_for_events([event_id], state_filter)
+ return state_map[event_id]
+
+ def _get_state_for_groups(self, groups, state_filter=StateFilter.all()):
+ """Gets the state at each of a list of state groups, optionally
+ filtering by type/state_key
+
+ Args:
+ groups (iterable[int]): list of state groups for which we want
+ to get the state.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ """
+ return self.stores.main._get_state_for_groups(groups, state_filter)
+
+ def store_state_group(
+ self, event_id, room_id, prev_group, delta_ids, current_state_ids
+ ):
+ """Store a new set of state, returning a newly assigned state group.
+
+ Args:
+ event_id (str): The event ID for which the state was calculated
+ room_id (str)
+ prev_group (int|None): A previous state group for the room, optional.
+ delta_ids (dict|None): The delta between state at `prev_group` and
+ `current_state_ids`, if `prev_group` was given. Same format as
+ `current_state_ids`.
+ current_state_ids (dict): The state to store. Map of (type, state_key)
+ to event_id.
+
+ Returns:
+ Deferred[int]: The state group ID
+ """
+ return self.stores.main.store_state_group(
+ event_id, room_id, prev_group, delta_ids, current_state_ids
+ )
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index cbb0a4810a..9d851beaa5 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -46,7 +46,7 @@ def _load_current_id(db_conn, table, column, step=1):
cur.execute("SELECT MAX(%s) FROM %s" % (column, table))
else:
cur.execute("SELECT MIN(%s) FROM %s" % (column, table))
- val, = cur.fetchone()
+ (val,) = cur.fetchone()
cur.close()
current_id = int(val) if val else step
return (max if step > 0 else min)(current_id, step)
|