diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a58187a76f..a6429d17ed 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -29,7 +29,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore
-from synapse.storage.persist_events import EventsPersistenceStore
+from synapse.storage.persist_events import EventsPersistenceStorage
__all__ = ["DataStores", "DataStore"]
@@ -44,7 +44,7 @@ class Storage(object):
# interfaces.
self.main = stores.main
- self.persistence = EventsPersistenceStore(hs, stores)
+ self.persistence = EventsPersistenceStorage(hs, stores)
def are_all_users_on_domain(txn, database_engine, domain):
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 6304531cd5..813f34528c 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -146,7 +146,7 @@ class EventsStore(
@_retry_on_integrity_error
@defer.inlineCallbacks
- def _persist_events(
+ def _persist_events_and_state_updates(
self,
events_and_contexts,
current_state_for_room,
@@ -155,18 +155,27 @@ class EventsStore(
backfilled=False,
delete_existing=False,
):
- """Persist events to db
+ """Persist a set of events alongside updates to the current state and
+ forward extremities tables.
Args:
events_and_contexts (list[(EventBase, EventContext)]):
- backfilled (bool):
+ current_state_for_room (dict[str, dict]): Map from room_id to the
+ current state of the room based on forward extremities
+ state_delta_for_room (dict[str, tuple]): Map from room_id to tuple
+ of `(to_delete, to_insert)` where to_delete is a list
+ of type/state keys to remove from current state, and to_insert
+ is a map (type,key)->event_id giving the state delta in each
+ room.
+ new_forward_extremities (dict[str, list[str]]): Map from room_id
+ to list of event IDs that are the new forward extremities of
+ the room.
+ backfilled (bool)
delete_existing (bool):
Returns:
Deferred: resolves when the events have been persisted
"""
- if not events_and_contexts:
- return
# We want to calculate the stream orderings as late as possible, as
# we only notify after all events with a lesser stream ordering have
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 9a63953d4d..cf66225574 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -171,7 +171,13 @@ class _EventPeristenceQueue(object):
pass
-class EventsPersistenceStore(object):
+class EventsPersistenceStorage(object):
+ """High level interface for handling persisting newly received events.
+
+ Takes care of batching up events by room, and calculating the necessary
+ current state and forward extremity changes.
+ """
+
def __init__(self, hs, stores: DataStores):
# We ultimately want to split out the state store from the main store,
# so we use separate variables here even though they point to the same
@@ -257,7 +263,8 @@ class EventsPersistenceStore(object):
def _persist_events(
self, events_and_contexts, backfilled=False, delete_existing=False
):
- """Persist events to db
+ """Calculates the change to current state and forward extremities, and
+ persists the given events and with those updates.
Args:
events_and_contexts (list[(EventBase, EventContext)]):
@@ -399,7 +406,7 @@ class EventsPersistenceStore(object):
if current_state is not None:
current_state_for_room[room_id] = current_state
- yield self.main_store._persist_events(
+ yield self.main_store._persist_events_and_state_updates(
chunk,
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
|