summary refs log tree commit diff
path: root/synapse/storage/databases/main/transactions.py
diff options
context:
space:
mode:
authorreivilibre <38398653+reivilibre@users.noreply.github.com>2020-09-04 15:06:51 +0100
committerGitHub <noreply@github.com>2020-09-04 15:06:51 +0100
commit17fa4c7ca79b45baebb84d49adbcd1a2a9ea3f09 (patch)
treea85bb2c427db4568f1a628d38bc93929aaee160c /synapse/storage/databases/main/transactions.py
parentCatch-up after Federation Outage (split, 1) (#8230) (diff)
downloadsynapse-17fa4c7ca79b45baebb84d49adbcd1a2a9ea3f09.tar.xz
Catch up after Federation Outage (split, 2): Track last successful stream ordering after transmission (#8247)
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
Diffstat (limited to 'synapse/storage/databases/main/transactions.py')
-rw-r--r--synapse/storage/databases/main/transactions.py38
1 files changed, 38 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index efc425f95d..c0a958252e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -333,3 +333,41 @@ class TransactionStore(SQLBaseStore):
             ["stream_ordering"],
             [(stream_ordering,)] * len(rows),
         )
+
+    async def get_destination_last_successful_stream_ordering(
+        self, destination: str
+    ) -> Optional[int]:
+        """
+        Gets the stream ordering of the PDU most-recently successfully sent
+        to the specified destination, or None if this information has not been
+        tracked yet.
+
+        Args:
+            destination: the destination to query
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            "destinations",
+            {"destination": destination},
+            "last_successful_stream_ordering",
+            allow_none=True,
+            desc="get_last_successful_stream_ordering",
+        )
+
+    async def set_destination_last_successful_stream_ordering(
+        self, destination: str, last_successful_stream_ordering: int
+    ) -> None:
+        """
+        Marks that we have successfully sent the PDUs up to and including the
+        one specified.
+
+        Args:
+            destination: the destination we have successfully sent to
+            last_successful_stream_ordering: the stream_ordering of the most
+                recent successfully-sent PDU
+        """
+        return await self.db_pool.simple_upsert(
+            "destinations",
+            keyvalues={"destination": destination},
+            values={"last_successful_stream_ordering": last_successful_stream_ordering},
+            desc="set_last_successful_stream_ordering",
+        )