summary refs log tree commit diff
path: root/synapse/replication/slave/storage/receipts.py
diff options
context:
space:
mode:
authorkaiyou <pierre@jaury.eu>2018-04-08 17:56:44 +0200
committerkaiyou <pierre@jaury.eu>2018-04-08 17:56:44 +0200
commita13b7860c6570ad1bb9003e94ad67c761f0cf312 (patch)
treed08b2c50d1c1a6e570c59396660cd83836aa9f14 /synapse/replication/slave/storage/receipts.py
parentMerge remote-tracking branch 'upstream/master' into feat-dockerfile (diff)
parentUpdate README.rst (diff)
downloadsynapse-a13b7860c6570ad1bb9003e94ad67c761f0cf312.tar.xz
Merge remote-tracking branch 'upstream/master' into feat-dockerfile
Diffstat (limited to 'synapse/replication/slave/storage/receipts.py')
-rw-r--r--synapse/replication/slave/storage/receipts.py36
1 files changed, 10 insertions, 26 deletions
diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index b371574ece..1647072f65 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,9 +17,7 @@
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
-from synapse.storage import DataStore
-from synapse.storage.receipts import ReceiptsStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.storage.receipts import ReceiptsWorkerStore
 
 # So, um, we want to borrow a load of functions intended for reading from
 # a DataStore, but we don't want to take functions that either write to the
@@ -29,36 +28,19 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedReceiptsStore(BaseSlavedStore):
+class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
-        super(SlavedReceiptsStore, self).__init__(db_conn, hs)
-
+        # We instantiate this first as the ReceiptsWorkerStore constructor
+        # needs to be able to call get_max_receipt_stream_id
         self._receipts_id_gen = SlavedIdTracker(
             db_conn, "receipts_linearized", "stream_id"
         )
 
-        self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
-        )
-
-    get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
-    get_linearized_receipts_for_room = (
-        ReceiptsStore.__dict__["get_linearized_receipts_for_room"]
-    )
-    _get_linearized_receipts_for_rooms = (
-        ReceiptsStore.__dict__["_get_linearized_receipts_for_rooms"]
-    )
-    get_last_receipt_event_id_for_user = (
-        ReceiptsStore.__dict__["get_last_receipt_event_id_for_user"]
-    )
-
-    get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
-    get_all_updated_receipts = DataStore.get_all_updated_receipts.__func__
+        super(SlavedReceiptsStore, self).__init__(db_conn, hs)
 
-    get_linearized_receipts_for_rooms = (
-        DataStore.get_linearized_receipts_for_rooms.__func__
-    )
+    def get_max_receipt_stream_id(self):
+        return self._receipts_id_gen.get_current_token()
 
     def stream_positions(self):
         result = super(SlavedReceiptsStore, self).stream_positions()
@@ -71,6 +53,8 @@ class SlavedReceiptsStore(BaseSlavedStore):
         self.get_last_receipt_event_id_for_user.invalidate(
             (user_id, room_id, receipt_type)
         )
+        self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id)
+        self.get_receipts_for_room.invalidate((room_id, receipt_type))
 
     def process_replication_rows(self, stream_name, token, rows):
         if stream_name == "receipts":