summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/client_ips.py6
-rw-r--r--synapse/storage/data_stores/main/devices.py8
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py6
-rw-r--r--synapse/storage/data_stores/main/event_federation.py3
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py135
-rw-r--r--synapse/storage/data_stores/main/events.py30
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py4
-rw-r--r--synapse/storage/data_stores/main/events_worker.py41
-rw-r--r--synapse/storage/data_stores/main/presence.py41
-rw-r--r--synapse/storage/data_stores/main/push_rule.py56
-rw-r--r--synapse/storage/data_stores/main/receipts.py91
-rw-r--r--synapse/storage/data_stores/main/registration.py4
-rw-r--r--synapse/storage/data_stores/main/roommember.py10
-rw-r--r--synapse/storage/data_stores/main/schema/delta/30/as_users.py2
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql23
-rw-r--r--synapse/storage/data_stores/main/search.py4
-rw-r--r--synapse/storage/data_stores/main/stream.py2
-rw-r--r--synapse/storage/data_stores/main/tags.py2
-rw-r--r--synapse/storage/data_stores/main/ui_auth.py2
-rw-r--r--synapse/storage/data_stores/state/bg_updates.py6
-rw-r--r--synapse/storage/data_stores/state/store.py19
-rw-r--r--synapse/storage/database.py16
-rw-r--r--synapse/storage/persist_events.py9
-rw-r--r--synapse/storage/state.py38
24 files changed, 383 insertions, 175 deletions
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py

index ea06f74535..1c035d51cb 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py
@@ -15,8 +15,6 @@ import logging -from six import iteritems - from twisted.internet import defer from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -421,7 +419,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): ): self.database_engine.lock_table(txn, "user_ips") - for entry in iteritems(to_update): + for entry in to_update.items(): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry try: @@ -530,7 +528,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): "user_agent": user_agent, "last_seen": last_seen, } - for (access_token, ip), (user_agent, last_seen) in iteritems(results) + for (access_token, ip), (user_agent, last_seen) in results.items() ] @wrap_as_background_process("prune_old_user_ips") diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index fb9f798e29..0ff0542453 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py
@@ -17,8 +17,6 @@ import logging from typing import List, Optional, Set, Tuple -from six import iteritems - from canonicaljson import json from twisted.internet import defer @@ -208,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore): ) # add the updated cross-signing keys to the results list - for user_id, result in iteritems(cross_signing_keys_by_user): + for user_id, result in cross_signing_keys_by_user.items(): result["user_id"] = user_id # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec results.append(("org.matrix.signing_key_update", result)) @@ -269,7 +267,7 @@ class DeviceWorkerStore(SQLBaseStore): ) results = [] - for user_id, user_devices in iteritems(devices): + for user_id, user_devices in devices.items(): # The prev_id for the first row is always the last row before # `from_stream_id` prev_id = yield self._get_last_device_update_for_remote_user( @@ -493,7 +491,7 @@ class DeviceWorkerStore(SQLBaseStore): if devices: user_devices = devices[user_id] results = [] - for device_id, device in iteritems(user_devices): + for device_id, device in user_devices.items(): result = {"device_id": device_id} key_json = device.get("key_json", None) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 20698bfd16..1a0842d4b0 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -16,8 +16,6 @@ # limitations under the License. from typing import Dict, List -from six import iteritems - from canonicaljson import encode_canonical_json, json from twisted.enterprise.adbapi import Connection @@ -64,9 +62,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): # Build the result structure, un-jsonify the results, and add the # "unsigned" section rv = {} - for user_id, device_keys in iteritems(results): + for user_id, device_keys in results.items(): rv[user_id] = {} - for device_id, device_info in iteritems(device_keys): + for device_id, device_info in device_keys.items(): r = db_to_json(device_info.pop("key_json")) r["unsigned"] = {} display_name = device_info["device_display_name"] diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 24ce8c4330..a6bb3221ff 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,10 +14,9 @@ # limitations under the License. import itertools import logging +from queue import Empty, PriorityQueue from typing import Dict, List, Optional, Set, Tuple -from six.moves.queue import Empty, PriorityQueue - from twisted.internet import defer from synapse.api.errors import StoreError diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 0321274de2..ba1b33a0a9 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2015-2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,9 +14,9 @@ # limitations under the License. import logging +from typing import Dict, Tuple -from six import iteritems - +import attr from canonicaljson import json from twisted.internet import defer @@ -38,6 +37,16 @@ DEFAULT_HIGHLIGHT_ACTION = [ ] +@attr.s +class EventPushSummary: + """Summary of pending event push actions for a given user in a given room.""" + + unread_count = attr.ib(type=int) + stream_ordering = attr.ib(type=int) + old_user_id = attr.ib(type=str) + notif_count = attr.ib(type=int) + + def _serialize_action(actions, is_highlight): """Custom serializer for actions. This allows us to "compress" common actions. @@ -124,25 +133,42 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): - # First get number of notifications. - # We don't need to put a notif=1 clause as all rows always have - # notif=1 + # First get number of actions, grouped on whether the action notifies. sql = ( - "SELECT count(*)" + "SELECT count(*), notif" " FROM event_push_actions ea" " WHERE" " user_id = ?" " AND room_id = ?" " AND stream_ordering > ?" + " GROUP BY notif" ) - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() - notify_count = row[0] if row else 0 + rows = txn.fetchall() + + # We should get a maximum number of two rows: one for notif = 0, which is the + # number of actions that contribute to the unread_count but not to the + # notify_count, and one for notif = 1, which is the number of actions that + # contribute to both counters. If one or both rows don't appear, then the + # value for the matching counter should be 0. + unread_count = 0 + notify_count = 0 + for row in rows: + # We always increment unread_count because actions that notify also + # contribute to it. + unread_count += row[0] + if row[1] == 1: + notify_count = row[0] + elif row[1] != 0: + logger.warning( + "Unexpected value %d for column 'notif' in table" + " 'event_push_actions'", + row[1], + ) txn.execute( """ - SELECT notif_count FROM event_push_summary + SELECT notif_count, unread_count FROM event_push_summary WHERE room_id = ? AND user_id = ? AND stream_ordering > ? """, (room_id, user_id, stream_ordering), @@ -150,6 +176,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): rows = txn.fetchall() if rows: notify_count += rows[0][0] + unread_count += rows[0][1] # Now get the number of highlights sql = ( @@ -166,7 +193,11 @@ class EventPushActionsWorkerStore(SQLBaseStore): row = txn.fetchone() highlight_count = row[0] if row else 0 - return {"notify_count": notify_count, "highlight_count": highlight_count} + return { + "unread_count": unread_count, + "notify_count": notify_count, + "highlight_count": highlight_count, + } @defer.inlineCallbacks def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): @@ -224,6 +255,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -252,6 +284,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -324,6 +357,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -352,6 +386,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -401,7 +436,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_if_maybe_push_in_range_for_user_txn(txn): sql = """ SELECT 1 FROM event_push_actions - WHERE user_id = ? AND stream_ordering > ? + WHERE user_id = ? AND stream_ordering > ? AND notif = 1 LIMIT 1 """ @@ -430,14 +465,15 @@ class EventPushActionsWorkerStore(SQLBaseStore): return # This is a helper function for generating the necessary tuple that - # can be used to inert into the `event_push_actions_staging` table. + # can be used to insert into the `event_push_actions_staging` table. def _gen_entry(user_id, actions): is_highlight = 1 if _action_has_highlight(actions) else 0 + notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1 return ( event_id, # event_id column user_id, # user_id column _serialize_action(actions, is_highlight), # actions column - 1, # notif column + notif, # notif column is_highlight, # highlight column ) @@ -455,7 +491,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): sql, ( _gen_entry(user_id, actions) - for user_id, actions in iteritems(user_id_actions) + for user_id, actions in user_id_actions.items() ), ) @@ -819,24 +855,51 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, - coalesce(old.notif_count, 0) + upd.notif_count, + coalesce(old.%s, 0) + upd.cnt, upd.stream_ordering, old.user_id FROM ( - SELECT user_id, room_id, count(*) as notif_count, + SELECT user_id, room_id, count(*) as cnt, max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0 + %s GROUP BY user_id, room_id ) AS upd LEFT JOIN event_push_summary AS old USING (user_id, room_id) """ - txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering)) - rows = txn.fetchall() + # First get the count of unread messages. + txn.execute( + sql % ("unread_count", ""), + (old_rotate_stream_ordering, rotate_to_stream_ordering), + ) - logger.info("Rotating notifications, handling %d rows", len(rows)) + # We need to merge both lists into a single object because we might not have the + # same amount of rows in each of them. In this case we use a dict indexed on the + # user ID and room ID to make it easier to populate. + summaries = {} # type: Dict[Tuple[str, str], EventPushSummary] + for row in txn: + summaries[(row[0], row[1])] = EventPushSummary( + unread_count=row[2], + stream_ordering=row[3], + old_user_id=row[4], + notif_count=0, + ) + + # Then get the count of notifications. + txn.execute( + sql % ("notif_count", "AND notif = 1"), + (old_rotate_stream_ordering, rotate_to_stream_ordering), + ) + + # notif_rows is populated based on a subset of the query used to populate + # unread_rows, so we can be sure that there will be no KeyError here. + for row in txn: + summaries[(row[0], row[1])].notif_count = row[2] + + logger.info("Rotating notifications, handling %d rows", len(summaries)) # If the `old.user_id` above is NULL then we know there isn't already an # entry in the table, so we simply insert it. Otherwise we update the @@ -846,22 +909,34 @@ class EventPushActionsStore(EventPushActionsWorkerStore): table="event_push_summary", values=[ { - "user_id": row[0], - "room_id": row[1], - "notif_count": row[2], - "stream_ordering": row[3], + "user_id": user_id, + "room_id": room_id, + "notif_count": summary.notif_count, + "unread_count": summary.unread_count, + "stream_ordering": summary.stream_ordering, } - for row in rows - if row[4] is None + for ((user_id, room_id), summary) in summaries.items() + if summary.old_user_id is None ], ) txn.executemany( """ - UPDATE event_push_summary SET notif_count = ?, stream_ordering = ? + UPDATE event_push_summary + SET notif_count = ?, unread_count = ?, stream_ordering = ? WHERE user_id = ? AND room_id = ? """, - ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None), + ( + ( + summary.notif_count, + summary.unread_count, + summary.stream_ordering, + user_id, + room_id, + ) + for ((user_id, room_id), summary) in summaries.items() + if summary.old_user_id is not None + ), ) txn.execute( diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a6572571b4..cfd24d2f06 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py
@@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple from functools import wraps from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple -from six import integer_types, iteritems, text_type -from six.moves import range - import attr from canonicaljson import json from prometheus_client import Counter @@ -232,10 +229,10 @@ class PersistEventsStore: event_counter.labels(event.type, origin_type, origin_entity).inc() - for room_id, new_state in iteritems(current_state_for_room): + for room_id, new_state in current_state_for_room.items(): self.store.get_current_state_ids.prefill((room_id,), new_state) - for room_id, latest_event_ids in iteritems(new_forward_extremeties): + for room_id, latest_event_ids in new_forward_extremeties.items(): self.store.get_latest_event_ids_in_room.prefill( (room_id,), list(latest_event_ids) ) @@ -461,7 +458,7 @@ class PersistEventsStore: state_delta_by_room: Dict[str, DeltaState], stream_id: int, ): - for room_id, delta_state in iteritems(state_delta_by_room): + for room_id, delta_state in state_delta_by_room.items(): to_delete = delta_state.to_delete to_insert = delta_state.to_insert @@ -545,7 +542,7 @@ class PersistEventsStore: """, [ (room_id, key[0], key[1], ev_id, ev_id) - for key, ev_id in iteritems(to_insert) + for key, ev_id in to_insert.items() ], ) @@ -642,7 +639,7 @@ class PersistEventsStore: def _update_forward_extremities_txn( self, txn, new_forward_extremities, max_stream_order ): - for room_id, new_extrem in iteritems(new_forward_extremities): + for room_id, new_extrem in new_forward_extremities.items(): self.db.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) @@ -655,7 +652,7 @@ class PersistEventsStore: table="event_forward_extremities", values=[ {"event_id": ev_id, "room_id": room_id} - for room_id, new_extrem in iteritems(new_forward_extremities) + for room_id, new_extrem in new_forward_extremities.items() for ev_id in new_extrem ], ) @@ -672,7 +669,7 @@ class PersistEventsStore: "event_id": event_id, "stream_ordering": max_stream_order, } - for room_id, new_extrem in iteritems(new_forward_extremities) + for room_id, new_extrem in new_forward_extremities.items() for event_id in new_extrem ], ) @@ -727,7 +724,7 @@ class PersistEventsStore: event.depth, depth_updates.get(event.room_id, event.depth) ) - for room_id, depth in iteritems(depth_updates): + for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) def _update_outliers_txn(self, txn, events_and_contexts): @@ -893,8 +890,7 @@ class PersistEventsStore: "received_ts": self._clock.time_msec(), "sender": event.sender, "contains_url": ( - "url" in event.content - and isinstance(event.content["url"], text_type) + "url" in event.content and isinstance(event.content["url"], str) ), } for event, _ in events_and_contexts @@ -1345,10 +1341,10 @@ class PersistEventsStore: ): if ( "min_lifetime" in event.content - and not isinstance(event.content.get("min_lifetime"), integer_types) + and not isinstance(event.content.get("min_lifetime"), int) ) or ( "max_lifetime" in event.content - and not isinstance(event.content.get("max_lifetime"), integer_types) + and not isinstance(event.content.get("max_lifetime"), int) ): # Ignore the event if one of the value isn't an integer. return @@ -1497,11 +1493,11 @@ class PersistEventsStore: table="event_to_state_groups", values=[ {"state_group": state_group_id, "event_id": event_id} - for event_id, state_group_id in iteritems(state_groups) + for event_id, state_group_id in state_groups.items() ], ) - for event_id, state_group_id in iteritems(state_groups): + for event_id, state_group_id in state_groups.items(): txn.call_after( self.store._get_state_group_for_event.prefill, (event_id,), diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index f54c8b1ee0..62d28f44dc 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -15,8 +15,6 @@ import logging -from six import text_type - from canonicaljson import json from twisted.internet import defer @@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): contains_url = "url" in content if contains_url: - contains_url &= isinstance(content["url"], text_type) + contains_url &= isinstance(content["url"], str) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 213d69100a..a48c7a96ca 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore): "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn ) - def get_all_new_backfill_event_rows(self, last_id, current_id, limit): + async def get_all_new_backfill_event_rows( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for backfill replication stream, including all new + backfilled events and events that have gone from being outliers to not. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_new_backfill_event_rows(txn): sql = ( @@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (-last_id, -current_id, limit)) - new_event_updates = txn.fetchall() + new_event_updates = [(row[0], row[1:]) for row in txn] + limited = False if len(new_event_updates) == limit: upper_bound = new_event_updates[-1][0] + limited = True else: upper_bound = current_id @@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore): " ORDER BY event_stream_ordering DESC" ) txn.execute(sql, (-last_id, -upper_bound)) - new_event_updates.extend(txn.fetchall()) + new_event_updates.extend((row[0], row[1:]) for row in txn) - return new_event_updates + if len(new_event_updates) >= limit: + upper_bound = new_event_updates[-1][0] + limited = True - return self.db.runInteraction( + return new_event_updates, upper_bound, limited + + return await self.db.runInteraction( "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows ) diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index dab31e0c2d..7574612619 100644 --- a/synapse/storage/data_stores/main/presence.py +++ b/synapse/storage/data_stores/main/presence.py
@@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Tuple + from twisted.internet import defer from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause @@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore): ) txn.execute(sql + clause, [stream_id] + list(args)) - def get_all_presence_updates(self, last_id, current_id, limit): + async def get_all_presence_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for presence replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_presence_updates_txn(txn): sql = """ @@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() + updates = [(row[0], row[1:]) for row in txn] + + upper_bound = current_id + limited = False + if len(updates) >= limit: + upper_bound = updates[-1][0] + limited = True + + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_presence_updates", get_all_presence_updates_txn ) diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index ef8f40959f..f6e78ca590 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py
@@ -16,7 +16,7 @@ import abc import logging -from typing import Union +from typing import List, Tuple, Union from canonicaljson import json @@ -348,23 +348,53 @@ class PushRulesWorkerStore( results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled return results - def get_all_push_rule_updates(self, last_id, current_id, limit): - """Get all the push rules changes that have happend on the server""" + async def get_all_push_rule_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for push_rules replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_push_rule_updates_txn(txn): - sql = ( - "SELECT stream_id, event_stream_ordering, user_id, rule_id," - " op, priority_class, priority, conditions, actions" - " FROM push_rules_stream" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) + sql = """ + SELECT stream_id, user_id + FROM push_rules_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() + updates = [(stream_id, (user_id,)) for stream_id, user_id in txn] + + limited = False + upper_bound = current_id + if len(updates) == limit: + limited = True + upper_bound = updates[-1][0] + + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_push_rule_updates", get_all_push_rule_updates_txn ) diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index cebdcd409f..8f5505bd67 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py
@@ -16,6 +16,7 @@ import abc import logging +from typing import List, Tuple from canonicaljson import json @@ -24,6 +25,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore): } return results - def get_all_updated_receipts(self, last_id, current_id, limit=None): + def get_users_sent_receipts_between(self, last_id: int, current_id: int): + """Get all users who sent receipts between `last_id` exclusive and + `current_id` inclusive. + + Returns: + Deferred[List[str]] + """ + if last_id == current_id: return defer.succeed([]) - def get_all_updated_receipts_txn(txn): - sql = ( - "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" - " FROM receipts_linearized" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - ) - args = [last_id, current_id] - if limit is not None: - sql += " LIMIT ?" - args.append(limit) - txn.execute(sql, args) + def _get_users_sent_receipts_between_txn(txn): + sql = """ + SELECT DISTINCT user_id FROM receipts_linearized + WHERE ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (last_id, current_id)) - return [r[0:5] + (json.loads(r[5]),) for r in txn] + return [r[0] for r in txn] return self.db.runInteraction( + "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn + ) + + async def get_all_updated_receipts( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for receipts replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_all_updated_receipts_txn(txn): + sql = """ + SELECT stream_id, room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) + + updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn] + + limited = False + upper_bound = current_id + + if len(updates) == limit: + limited = True + upper_bound = updates[-1][0] + + return updates, upper_bound, limited + + return await self.db.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn ) @@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore): room_id, None, update_metrics=False ) - # first handle the Deferred case - if isinstance(res, defer.Deferred): - if res.called: - res = res.result + # first handle the ObservableDeferred case + if isinstance(res, ObservableDeferred): + if res.has_called(): + res = res.get_result() else: res = None diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 9768981891..587d4b91c1 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py
@@ -19,8 +19,6 @@ import logging import re from typing import Optional -from six import iterkeys - from twisted.internet import defer from twisted.internet.defer import Deferred @@ -753,7 +751,7 @@ class RegistrationWorkerStore(SQLBaseStore): last_send_attempt, validated_at FROM threepid_validation_session WHERE %s """ % ( - " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)), + " AND ".join("%s = ?" % k for k in keyvalues.keys()), ) if validated is not None: diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 137ebac833..44bab65eac 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py
@@ -17,8 +17,6 @@ import logging from typing import Iterable, List, Set -from six import iteritems, itervalues - from canonicaljson import json from twisted.internet import defer @@ -544,7 +542,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = {} member_event_ids = [ e_id - for key, e_id in iteritems(current_state_ids) + for key, e_id in current_state_ids.items() if key[0] == EventTypes.Member ] @@ -561,7 +559,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): users_in_room = dict(prev_res) member_event_ids = [ e_id - for key, e_id in iteritems(context.delta_ids) + for key, e_id in context.delta_ids.items() if key[0] == EventTypes.Member ] for etype, state_key in context.delta_ids: @@ -1101,7 +1099,7 @@ class _JoinedHostsCache(object): if state_entry.state_group == self.state_group: pass elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in iteritems(state_entry.delta_ids): + for (typ, state_key), event_id in state_entry.delta_ids.items(): if typ != EventTypes.Member: continue @@ -1131,7 +1129,7 @@ class _JoinedHostsCache(object): self.state_group = state_entry.state_group else: self.state_group = object() - self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users)) + self._len = sum(len(v) for v in self.hosts_to_joined_users.values()) return frozenset(self.hosts_to_joined_users) def __len__(self): diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
index 9b95411fb6..b42c02710a 100644 --- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py +++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
@@ -13,8 +13,6 @@ # limitations under the License. import logging -from six.moves import range - from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) diff --git a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql new file mode 100644
index 0000000000..f1459ef7f0 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql
@@ -0,0 +1,23 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Store the number of unread messages, i.e. messages that triggered either a notify +-- action or a mark_unread one. +ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0; + +-- Pre-populate the new column with the count of pending notifications. +-- We expect event_push_summary to be relatively small, so we can do this update +-- synchronously without impacting Synapse's startup time too much. +UPDATE event_push_summary SET unread_count = notif_count; \ No newline at end of file diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 3c38b1ac50..9d6540d142 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py
@@ -17,8 +17,6 @@ import logging import re from collections import namedtuple -from six import string_types - from canonicaljson import json from twisted.internet import defer @@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # skip over it. continue - if not isinstance(value, string_types): + if not isinstance(value, str): # If the event body, name or topic isn't a string # then skip over it continue diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index e89f0bffb5..379d758b5d 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py
@@ -40,8 +40,6 @@ import abc import logging from collections import namedtuple -from six.moves import range - from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 4219018302..f8c776be3f 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py
@@ -16,8 +16,6 @@ import logging -from six.moves import range - from canonicaljson import json from twisted.internet import defer diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
index 1d8ee22fb1..ec2f38c373 100644 --- a/synapse/storage/data_stores/main/ui_auth.py +++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -186,7 +186,7 @@ class UIAuthWorkerStore(SQLBaseStore): # The clientdict gets stored as JSON. clientdict_json = json.dumps(clientdict) - self.db.simple_update_one( + await self.db.simple_update_one( table="ui_auth_sessions", keyvalues={"session_id": session_id}, updatevalues={"clientdict": clientdict_json}, diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py
index ff000bc9ec..be1fe97d79 100644 --- a/synapse/storage/data_stores/state/bg_updates.py +++ b/synapse/storage/data_stores/state/bg_updates.py
@@ -15,8 +15,6 @@ import logging -from six import iteritems - from twisted.internet import defer from synapse.storage._base import SQLBaseStore @@ -280,7 +278,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): delta_state = { key: value - for key, value in iteritems(curr_state) + for key, value in curr_state.items() if prev_state.get(key, None) != value } @@ -316,7 +314,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(delta_state) + for key, state_id in delta_state.items() ], ) diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py
index f3ad1e4369..5db9f20135 100644 --- a/synapse/storage/data_stores/state/store.py +++ b/synapse/storage/data_stores/state/store.py
@@ -17,9 +17,6 @@ import logging from collections import namedtuple from typing import Dict, Iterable, List, Set, Tuple -from six import iteritems -from six.moves import range - from twisted.internet import defer from synapse.api.constants import EventTypes @@ -263,7 +260,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): # And finally update the result dict, by filtering out any extra # stuff we pulled out of the database. - for group, group_state_dict in iteritems(group_to_state_dict): + for group, group_state_dict in group_to_state_dict.items(): # We just replace any existing entries, as we will have loaded # everything we need from the database anyway. state[group] = state_filter.filter_state(group_state_dict) @@ -341,11 +338,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): else: non_member_types = non_member_filter.concrete_types() - for group, group_state_dict in iteritems(group_to_state_dict): + for group, group_state_dict in group_to_state_dict.items(): state_dict_members = {} state_dict_non_members = {} - for k, v in iteritems(group_state_dict): + for k, v in group_state_dict.items(): if k[0] == EventTypes.Member: state_dict_members[k] = v else: @@ -432,7 +429,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(delta_ids) + for key, state_id in delta_ids.items() ], ) else: @@ -447,7 +444,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(current_state_ids) + for key, state_id in current_state_ids.items() ], ) @@ -458,7 +455,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): current_member_state_ids = { s: ev - for (s, ev) in iteritems(current_state_ids) + for (s, ev) in current_state_ids.items() if s[0] == EventTypes.Member } txn.call_after( @@ -470,7 +467,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): current_non_member_state_ids = { s: ev - for (s, ev) in iteritems(current_state_ids) + for (s, ev) in current_state_ids.items() if s[0] != EventTypes.Member } txn.call_after( @@ -555,7 +552,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in iteritems(curr_state) + for key, state_id in curr_state.items() ], ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b112ff3df2..3be20c866a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -16,6 +16,7 @@ # limitations under the License. import logging import time +from sys import intern from time import monotonic as monotonic_time from typing import ( Any, @@ -29,9 +30,6 @@ from typing import ( TypeVar, ) -from six import iteritems, iterkeys, itervalues -from six.moves import intern, range - from prometheus_client import Histogram from twisted.enterprise import adbapi @@ -259,7 +257,7 @@ class PerformanceCounters(object): def interval(self, interval_duration_secs, limit=3): counters = [] - for name, (count, cum_time) in iteritems(self.current_counters): + for name, (count, cum_time) in self.current_counters.items(): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append( ( @@ -1053,7 +1051,7 @@ class Database(object): sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table} if keyvalues: - sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) txn.execute(sql, list(keyvalues.values())) else: txn.execute(sql) @@ -1191,7 +1189,7 @@ class Database(object): clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) clauses = [clause] - for key, value in iteritems(keyvalues): + for key, value in keyvalues.items(): clauses.append("%s = ?" % (key,)) values.append(value) @@ -1212,7 +1210,7 @@ class Database(object): @staticmethod def simple_update_txn(txn, table, keyvalues, updatevalues): if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) else: where = "" @@ -1351,7 +1349,7 @@ class Database(object): clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) clauses = [clause] - for key, value in iteritems(keyvalues): + for key, value in keyvalues.items(): clauses.append("%s = ?" % (key,)) values.append(value) @@ -1388,7 +1386,7 @@ class Database(object): txn.close() if cache: - min_val = min(itervalues(cache)) + min_val = min(cache.values()) else: min_val = max_value diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index f159400a87..ec894a91cb 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py
@@ -20,9 +20,6 @@ import logging from collections import deque, namedtuple from typing import Iterable, List, Optional, Set, Tuple -from six import iteritems -from six.moves import range - from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -218,7 +215,7 @@ class EventsPersistenceStorage(object): partitioned.setdefault(event.room_id, []).append((event, ctx)) deferreds = [] - for room_id, evs_ctxs in iteritems(partitioned): + for room_id, evs_ctxs in partitioned.items(): d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled ) @@ -319,7 +316,7 @@ class EventsPersistenceStorage(object): (event, context) ) - for room_id, ev_ctx_rm in iteritems(events_by_room): + for room_id, ev_ctx_rm in events_by_room.items(): latest_event_ids = await self.main_store.get_latest_event_ids_in_room( room_id ) @@ -674,7 +671,7 @@ class EventsPersistenceStorage(object): to_insert = { key: ev_id - for key, ev_id in iteritems(current_state) + for key, ev_id in current_state.items() if ev_id != existing_state.get(key) } diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index c522c80922..dc568476f4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py
@@ -16,8 +16,6 @@ import logging from typing import Iterable, List, TypeVar -from six import iteritems, itervalues - import attr from twisted.internet import defer @@ -51,7 +49,7 @@ class StateFilter(object): # If `include_others` is set we canonicalise the filter by removing # wildcards from the types dictionary if self.include_others: - self.types = {k: v for k, v in iteritems(self.types) if v is not None} + self.types = {k: v for k, v in self.types.items() if v is not None} @staticmethod def all(): @@ -150,7 +148,7 @@ class StateFilter(object): has_non_member_wildcard = self.include_others or any( state_keys is None - for t, state_keys in iteritems(self.types) + for t, state_keys in self.types.items() if t != EventTypes.Member ) @@ -199,7 +197,7 @@ class StateFilter(object): # First we build up a lost of clauses for each type/state_key combo clauses = [] - for etype, state_keys in iteritems(self.types): + for etype, state_keys in self.types.items(): if state_keys is None: clauses.append("(type = ?)") where_args.append(etype) @@ -251,7 +249,7 @@ class StateFilter(object): return dict(state_dict) filtered_state = {} - for k, v in iteritems(state_dict): + for k, v in state_dict.items(): typ, state_key = k if typ in self.types: state_keys = self.types[typ] @@ -279,7 +277,7 @@ class StateFilter(object): """ return self.include_others or any( - state_keys is None for state_keys in itervalues(self.types) + state_keys is None for state_keys in self.types.values() ) def concrete_types(self): @@ -292,7 +290,7 @@ class StateFilter(object): """ return [ (t, s) - for t, state_keys in iteritems(self.types) + for t, state_keys in self.types.items() if state_keys is not None for s in state_keys ] @@ -324,7 +322,7 @@ class StateFilter(object): member_filter = StateFilter.none() non_member_filter = StateFilter( - types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member}, + types={k: v for k, v in self.types.items() if k != EventTypes.Member}, include_others=self.include_others, ) @@ -366,7 +364,7 @@ class StateGroupStorage(object): event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids) - groups = set(itervalues(event_to_groups)) + groups = set(event_to_groups.values()) group_to_state = yield self.stores.state._get_state_for_groups(groups) return group_to_state @@ -400,8 +398,8 @@ class StateGroupStorage(object): state_event_map = yield self.stores.main.get_events( [ ev_id - for group_ids in itervalues(group_to_ids) - for ev_id in itervalues(group_ids) + for group_ids in group_to_ids.values() + for ev_id in group_ids.values() ], get_prev_content=False, ) @@ -409,10 +407,10 @@ class StateGroupStorage(object): return { group: [ state_event_map[v] - for v in itervalues(event_id_map) + for v in event_id_map.values() if v in state_event_map ] - for group, event_id_map in iteritems(group_to_ids) + for group, event_id_map in group_to_ids.items() } def _get_state_groups_from_groups( @@ -444,23 +442,23 @@ class StateGroupStorage(object): """ event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids) - groups = set(itervalues(event_to_groups)) + groups = set(event_to_groups.values()) group_to_state = yield self.stores.state._get_state_for_groups( groups, state_filter ) state_event_map = yield self.stores.main.get_events( - [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], + [ev_id for sd in group_to_state.values() for ev_id in sd.values()], get_prev_content=False, ) event_to_state = { event_id: { k: state_event_map[v] - for k, v in iteritems(group_to_state[group]) + for k, v in group_to_state[group].items() if v in state_event_map } - for event_id, group in iteritems(event_to_groups) + for event_id, group in event_to_groups.items() } return {event: event_to_state[event] for event in event_ids} @@ -481,14 +479,14 @@ class StateGroupStorage(object): """ event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids) - groups = set(itervalues(event_to_groups)) + groups = set(event_to_groups.values()) group_to_state = yield self.stores.state._get_state_for_groups( groups, state_filter ) event_to_state = { event_id: group_to_state[group] - for event_id, group in iteritems(event_to_groups) + for event_id, group in event_to_groups.items() } return {event: event_to_state[event] for event in event_ids}