diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/events.py | 19 | ||||
-rw-r--r-- | synapse/storage/persist_events.py | 13 |
3 files changed, 26 insertions, 10 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a58187a76f..a6429d17ed 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -29,7 +29,7 @@ stored in `synapse.storage.schema`. from synapse.storage.data_stores import DataStores from synapse.storage.data_stores.main import DataStore -from synapse.storage.persist_events import EventsPersistenceStore +from synapse.storage.persist_events import EventsPersistenceStorage __all__ = ["DataStores", "DataStore"] @@ -44,7 +44,7 @@ class Storage(object): # interfaces. self.main = stores.main - self.persistence = EventsPersistenceStore(hs, stores) + self.persistence = EventsPersistenceStorage(hs, stores) def are_all_users_on_domain(txn, database_engine, domain): diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 6304531cd5..813f34528c 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -146,7 +146,7 @@ class EventsStore( @_retry_on_integrity_error @defer.inlineCallbacks - def _persist_events( + def _persist_events_and_state_updates( self, events_and_contexts, current_state_for_room, @@ -155,18 +155,27 @@ class EventsStore( backfilled=False, delete_existing=False, ): - """Persist events to db + """Persist a set of events alongside updates to the current state and + forward extremities tables. Args: events_and_contexts (list[(EventBase, EventContext)]): - backfilled (bool): + current_state_for_room (dict[str, dict]): Map from room_id to the + current state of the room based on forward extremities + state_delta_for_room (dict[str, tuple]): Map from room_id to tuple + of `(to_delete, to_insert)` where to_delete is a list + of type/state keys to remove from current state, and to_insert + is a map (type,key)->event_id giving the state delta in each + room. + new_forward_extremities (dict[str, list[str]]): Map from room_id + to list of event IDs that are the new forward extremities of + the room. + backfilled (bool) delete_existing (bool): Returns: Deferred: resolves when the events have been persisted """ - if not events_and_contexts: - return # We want to calculate the stream orderings as late as possible, as # we only notify after all events with a lesser stream ordering have diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 9a63953d4d..cf66225574 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -171,7 +171,13 @@ class _EventPeristenceQueue(object): pass -class EventsPersistenceStore(object): +class EventsPersistenceStorage(object): + """High level interface for handling persisting newly received events. + + Takes care of batching up events by room, and calculating the necessary + current state and forward extremity changes. + """ + def __init__(self, hs, stores: DataStores): # We ultimately want to split out the state store from the main store, # so we use separate variables here even though they point to the same @@ -257,7 +263,8 @@ class EventsPersistenceStore(object): def _persist_events( self, events_and_contexts, backfilled=False, delete_existing=False ): - """Persist events to db + """Calculates the change to current state and forward extremities, and + persists the given events and with those updates. Args: events_and_contexts (list[(EventBase, EventContext)]): @@ -399,7 +406,7 @@ class EventsPersistenceStore(object): if current_state is not None: current_state_for_room[room_id] = current_state - yield self.main_store._persist_events( + yield self.main_store._persist_events_and_state_updates( chunk, current_state_for_room=current_state_for_room, state_delta_for_room=state_delta_for_room, |