summary refs log tree commit diff
path: root/tests/replication/tcp/streams/test_receipts.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/replication/tcp/streams/test_receipts.py')
-rw-r--r--tests/replication/tcp/streams/test_receipts.py62
1 files changed, 50 insertions, 12 deletions
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index d5a99f6caa..56b062ecc1 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,35 +12,73 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from synapse.replication.tcp.streams._base import ReceiptsStreamRow
 
-from tests.replication.tcp.streams._base import BaseStreamTestCase
+# type: ignore
+
+from mock import Mock
+
+from synapse.replication.tcp.streams._base import ReceiptsStream
+
+from tests.replication._base import BaseStreamTestCase
 
 USER_ID = "@feeling:blue"
-ROOM_ID = "!room:blue"
-EVENT_ID = "$event:blue"
 
 
 class ReceiptsStreamTestCase(BaseStreamTestCase):
+    def _build_replication_data_handler(self):
+        return Mock(wraps=super()._build_replication_data_handler())
+
     def test_receipt(self):
-        # make the client subscribe to the receipts stream
-        self.replicate_stream("receipts", "NOW")
+        self.reconnect()
 
         # tell the master to send a new receipt
         self.get_success(
             self.hs.get_datastore().insert_receipt(
-                ROOM_ID, "m.read", USER_ID, [EVENT_ID], {"a": 1}
+                "!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1}
             )
         )
         self.replicate()
 
         # there should be one RDATA command
-        rdata_rows = self.test_handler.received_rdata_rows
+        self.test_handler.on_rdata.assert_called_once()
+        stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+        self.assertEqual(stream_name, "receipts")
         self.assertEqual(1, len(rdata_rows))
-        self.assertEqual(rdata_rows[0][0], "receipts")
-        row = rdata_rows[0][2]  # type: ReceiptsStreamRow
-        self.assertEqual(ROOM_ID, row.room_id)
+        row = rdata_rows[0]  # type: ReceiptsStream.ReceiptsStreamRow
+        self.assertEqual("!room:blue", row.room_id)
         self.assertEqual("m.read", row.receipt_type)
         self.assertEqual(USER_ID, row.user_id)
-        self.assertEqual(EVENT_ID, row.event_id)
+        self.assertEqual("$event:blue", row.event_id)
         self.assertEqual({"a": 1}, row.data)
+
+        # Now let's disconnect and insert some data.
+        self.disconnect()
+
+        self.test_handler.on_rdata.reset_mock()
+
+        self.get_success(
+            self.hs.get_datastore().insert_receipt(
+                "!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2}
+            )
+        )
+        self.replicate()
+
+        # Nothing should have happened as we are disconnected
+        self.test_handler.on_rdata.assert_not_called()
+
+        self.reconnect()
+        self.pump(0.1)
+
+        # We should now have caught up and get the missing data
+        self.test_handler.on_rdata.assert_called_once()
+        stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+        self.assertEqual(stream_name, "receipts")
+        self.assertEqual(token, 3)
+        self.assertEqual(1, len(rdata_rows))
+
+        row = rdata_rows[0]  # type: ReceiptsStream.ReceiptsStreamRow
+        self.assertEqual("!room2:blue", row.room_id)
+        self.assertEqual("m.read", row.receipt_type)
+        self.assertEqual(USER_ID, row.user_id)
+        self.assertEqual("$event2:foo", row.event_id)
+        self.assertEqual({"a": 2}, row.data)