| diff --git a/changelog.d/9900.bugfix b/changelog.d/9900.bugfix
new file mode 100644
index 0000000000..a8470fca3f
--- /dev/null
+++ b/changelog.d/9900.bugfix
@@ -0,0 +1 @@
+Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
 index e9f618bb5a..ebbc234334 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2045,18 +2045,40 @@ class PresenceFederationQueue:
             )
             return result["updates"], result["upto_token"], result["limited"]
 
+        # If the from_token is the current token then there's nothing to return
+        # and we can trivially no-op.
+        if from_token == self._next_id - 1:
+            return [], upto_token, False
+
         # We can find the correct position in the queue by noting that there is
         # exactly one entry per stream ID, and that the last entry has an ID of
         # `self._next_id - 1`, so we can count backwards from the end.
         #
+        # Since we are returning all states in the range `from_token < stream_id
+        # <= upto_token` we look for the index with a `stream_id` of `from_token
+        # + 1`.
+        #
         # Since the start of the queue is periodically truncated we need to
         # handle the case where `from_token` stream ID has already been dropped.
-        start_idx = max(from_token - self._next_id, -len(self._queue))
+        start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
 
         to_send = []  # type: List[Tuple[int, Tuple[str, str]]]
         limited = False
         new_id = upto_token
         for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+            if stream_id <= from_token:
+                # Paranoia check that we are actually only sending states that
+                # are have stream_id strictly greater than from_token. We should
+                # never hit this.
+                logger.warning(
+                    "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
+                    stream_id,
+                    from_token,
+                    self._next_id,
+                    len(self._queue),
+                )
+                continue
+
             if stream_id > upto_token:
                 break
 
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
 index 61271cd084..ce330e79cc 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -509,6 +509,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
 
         self.assertCountEqual(rows, expected_rows)
 
+        now_token = self.queue.get_current_token(self.instance_name)
+        rows, upto_token, limited = self.get_success(
+            self.queue.get_replication_rows("master", upto_token, now_token, 10)
+        )
+        self.assertEqual(upto_token, now_token)
+        self.assertFalse(limited)
+        self.assertCountEqual(rows, [])
+
     def test_send_and_get_split(self):
         state1 = UserPresenceState.default("@user1:test")
         state2 = UserPresenceState.default("@user2:test")
@@ -538,6 +546,20 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
 
         self.assertCountEqual(rows, expected_rows)
 
+        now_token = self.queue.get_current_token(self.instance_name)
+        rows, upto_token, limited = self.get_success(
+            self.queue.get_replication_rows("master", upto_token, now_token, 10)
+        )
+
+        self.assertEqual(upto_token, now_token)
+        self.assertFalse(limited)
+
+        expected_rows = [
+            (2, ("dest3", "@user3:test")),
+        ]
+
+        self.assertCountEqual(rows, expected_rows)
+
     def test_clear_queue_all(self):
         state1 = UserPresenceState.default("@user1:test")
         state2 = UserPresenceState.default("@user2:test")
 |