diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index f95679ebc4..3ec4d1d9c2 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import itertools
import logging
import threading
@@ -74,6 +73,13 @@ class EventRedactBehaviour(Names):
class EventsWorkerStore(SQLBaseStore):
+ # Whether to use dedicated DB threads for event fetching. This is only used
+ # if there are multiple DB threads available. When used will lock the DB
+ # thread for periods of time (so unit tests want to disable this when they
+ # run DB transactions on the main thread). See EVENT_QUEUE_* for more
+ # options controlling this.
+ USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING = True
+
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
@@ -130,6 +136,15 @@ class EventsWorkerStore(SQLBaseStore):
db_conn, "events", "stream_ordering", step=-1
)
+ if not hs.config.worker.worker_app:
+ # We periodically clean out old transaction ID mappings
+ self._clock.looping_call(
+ run_as_background_process,
+ 5 * 60 * 1000,
+ "_cleanup_old_transaction_ids",
+ self._cleanup_old_transaction_ids,
+ )
+
self._get_event_cache = Cache(
"*getEvent*",
keylen=3,
@@ -522,7 +537,11 @@ class EventsWorkerStore(SQLBaseStore):
if not event_list:
single_threaded = self.database_engine.single_threaded
- if single_threaded or i > EVENT_QUEUE_ITERATIONS:
+ if (
+ not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
+ or single_threaded
+ or i > EVENT_QUEUE_ITERATIONS
+ ):
self._event_fetch_ongoing -= 1
return
else:
@@ -712,6 +731,7 @@ class EventsWorkerStore(SQLBaseStore):
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
+ original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
event_map[event_id] = original_ev
@@ -779,6 +799,8 @@ class EventsWorkerStore(SQLBaseStore):
* event_id (str)
+ * stream_ordering (int): stream ordering for this event
+
* json (str): json-encoded event structure
* internal_metadata (str): json-encoded internal metadata dict
@@ -811,13 +833,15 @@ class EventsWorkerStore(SQLBaseStore):
sql = """\
SELECT
e.event_id,
- e.internal_metadata,
- e.json,
- e.format_version,
+ e.stream_ordering,
+ ej.internal_metadata,
+ ej.json,
+ ej.format_version,
r.room_version,
rej.reason
- FROM event_json as e
- LEFT JOIN rooms r USING (room_id)
+ FROM events AS e
+ JOIN event_json AS ej USING (event_id)
+ LEFT JOIN rooms r ON r.room_id = e.room_id
LEFT JOIN rejections as rej USING (event_id)
WHERE """
@@ -831,11 +855,12 @@ class EventsWorkerStore(SQLBaseStore):
event_id = row[0]
event_dict[event_id] = {
"event_id": event_id,
- "internal_metadata": row[1],
- "json": row[2],
- "format_version": row[3],
- "room_version_id": row[4],
- "rejected_reason": row[5],
+ "stream_ordering": row[1],
+ "internal_metadata": row[2],
+ "json": row[3],
+ "format_version": row[4],
+ "room_version_id": row[5],
+ "rejected_reason": row[6],
"redactions": [],
}
@@ -1017,16 +1042,12 @@ class EventsWorkerStore(SQLBaseStore):
return {"v1": complexity_v1}
- def get_current_backfill_token(self):
- """The current minimum token that backfilled events have reached"""
- return -self._backfill_id_gen.get_current_token()
-
def get_current_events_token(self):
"""The current maximum token that events have reached"""
return self._stream_id_gen.get_current_token()
async def get_all_new_forward_event_rows(
- self, last_id: int, current_id: int, limit: int
+ self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple]:
"""Returns new events, for the Events replication stream
@@ -1050,10 +1071,11 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
+ " AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
- txn.execute(sql, (last_id, current_id, limit))
+ txn.execute(sql, (last_id, current_id, instance_name, limit))
return txn.fetchall()
return await self.db_pool.runInteraction(
@@ -1061,7 +1083,7 @@ class EventsWorkerStore(SQLBaseStore):
)
async def get_ex_outlier_stream_rows(
- self, last_id: int, current_id: int
+ self, instance_name: str, last_id: int, current_id: int
) -> List[Tuple]:
"""Returns de-outliered events, for the Events replication stream
@@ -1080,16 +1102,17 @@ class EventsWorkerStore(SQLBaseStore):
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" FROM events AS e"
- " INNER JOIN ex_outlier_stream USING (event_id)"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
+ " AND out.instance_name = ?"
" ORDER BY event_stream_ordering ASC"
)
- txn.execute(sql, (last_id, current_id))
+ txn.execute(sql, (last_id, current_id, instance_name))
return txn.fetchall()
return await self.db_pool.runInteraction(
@@ -1102,6 +1125,9 @@ class EventsWorkerStore(SQLBaseStore):
"""Get updates for backfill replication stream, including all new
backfilled events and events that have gone from being outliers to not.
+ NOTE: The IDs given here are from replication, and so should be
+ *positive*.
+
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
@@ -1132,10 +1158,11 @@ class EventsWorkerStore(SQLBaseStore):
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
+ " AND instance_name = ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
- txn.execute(sql, (-last_id, -current_id, limit))
+ txn.execute(sql, (-last_id, -current_id, instance_name, limit))
new_event_updates = [(row[0], row[1:]) for row in txn]
limited = False
@@ -1149,15 +1176,16 @@ class EventsWorkerStore(SQLBaseStore):
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" FROM events AS e"
- " INNER JOIN ex_outlier_stream USING (event_id)"
+ " INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
+ " AND out.instance_name = ?"
" ORDER BY event_stream_ordering DESC"
)
- txn.execute(sql, (-last_id, -upper_bound))
+ txn.execute(sql, (-last_id, -upper_bound, instance_name))
new_event_updates.extend((row[0], row[1:]) for row in txn)
if len(new_event_updates) >= limit:
@@ -1171,7 +1199,7 @@ class EventsWorkerStore(SQLBaseStore):
)
async def get_all_updated_current_state_deltas(
- self, from_token: int, to_token: int, target_row_count: int
+ self, instance_name: str, from_token: int, to_token: int, target_row_count: int
) -> Tuple[List[Tuple], int, bool]:
"""Fetch updates from current_state_delta_stream
@@ -1197,9 +1225,10 @@ class EventsWorkerStore(SQLBaseStore):
SELECT stream_id, room_id, type, state_key, event_id
FROM current_state_delta_stream
WHERE ? < stream_id AND stream_id <= ?
+ AND instance_name = ?
ORDER BY stream_id ASC LIMIT ?
"""
- txn.execute(sql, (from_token, to_token, target_row_count))
+ txn.execute(sql, (from_token, to_token, instance_name, target_row_count))
return txn.fetchall()
def get_deltas_for_stream_id_txn(txn, stream_id):
@@ -1287,3 +1316,76 @@ class EventsWorkerStore(SQLBaseStore):
return await self.db_pool.runInteraction(
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)
+
+ async def get_event_id_from_transaction_id(
+ self, room_id: str, user_id: str, token_id: int, txn_id: str
+ ) -> Optional[str]:
+ """Look up if we have already persisted an event for the transaction ID,
+ returning the event ID if so.
+ """
+ return await self.db_pool.simple_select_one_onecol(
+ table="event_txn_id",
+ keyvalues={
+ "room_id": room_id,
+ "user_id": user_id,
+ "token_id": token_id,
+ "txn_id": txn_id,
+ },
+ retcol="event_id",
+ allow_none=True,
+ desc="get_event_id_from_transaction_id",
+ )
+
+ async def get_already_persisted_events(
+ self, events: Iterable[EventBase]
+ ) -> Dict[str, str]:
+ """Look up if we have already persisted an event for the transaction ID,
+ returning a mapping from event ID in the given list to the event ID of
+ an existing event.
+
+ Also checks if there are duplicates in the given events, if there are
+ will map duplicates to the *first* event.
+ """
+
+ mapping = {}
+ txn_id_to_event = {} # type: Dict[Tuple[str, int, str], str]
+
+ for event in events:
+ token_id = getattr(event.internal_metadata, "token_id", None)
+ txn_id = getattr(event.internal_metadata, "txn_id", None)
+
+ if token_id and txn_id:
+ # Check if this is a duplicate of an event in the given events.
+ existing = txn_id_to_event.get((event.room_id, token_id, txn_id))
+ if existing:
+ mapping[event.event_id] = existing
+ continue
+
+ # Check if this is a duplicate of an event we've already
+ # persisted.
+ existing = await self.get_event_id_from_transaction_id(
+ event.room_id, event.sender, token_id, txn_id
+ )
+ if existing:
+ mapping[event.event_id] = existing
+ txn_id_to_event[(event.room_id, token_id, txn_id)] = existing
+ else:
+ txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id
+
+ return mapping
+
+ async def _cleanup_old_transaction_ids(self):
+ """Cleans out transaction id mappings older than 24hrs.
+ """
+
+ def _cleanup_old_transaction_ids_txn(txn):
+ sql = """
+ DELETE FROM event_txn_id
+ WHERE inserted_ts < ?
+ """
+ one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000
+ txn.execute(sql, (one_day_ago,))
+
+ return await self.db_pool.runInteraction(
+ "_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn,
+ )
|