summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-01-05 10:54:22 +0000
committerErik Johnston <erik@matrix.org>2018-01-05 10:54:22 +0000
commit18e3a16e8b2303e6b638f679b5b8533e329cbe7a (patch)
tree93b4f34678d9521e39d54585d6adc66319514bcd /synapse/replication
parentMerge pull request #2737 from Valodim/master (diff)
parentBump version and changelog (diff)
downloadsynapse-18e3a16e8b2303e6b638f679b5b8533e329cbe7a.tar.xz
Merge branch 'release-v0.26.0' of github.com:matrix-org/synapse v0.26.0
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/events.py39
-rw-r--r--synapse/replication/tcp/resource.py6
2 files changed, 10 insertions, 35 deletions
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 94ebbffc1b..29d7296b43 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -12,20 +12,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
+import logging
 
 from synapse.api.constants import EventTypes
 from synapse.storage import DataStore
-from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.event_push_actions import EventPushActionsStore
-from synapse.storage.state import StateStore
+from synapse.storage.roommember import RoomMemberStore
+from synapse.storage.state import StateGroupReadStore
 from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-import logging
-
+from ._base import BaseSlavedStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 logger = logging.getLogger(__name__)
 
@@ -39,7 +37,7 @@ logger = logging.getLogger(__name__)
 # the method descriptor on the DataStore and chuck them into our class.
 
 
-class SlavedEventStore(BaseSlavedStore):
+class SlavedEventStore(StateGroupReadStore, BaseSlavedStore):
 
     def __init__(self, db_conn, hs):
         super(SlavedEventStore, self).__init__(db_conn, hs)
@@ -90,25 +88,9 @@ class SlavedEventStore(BaseSlavedStore):
     _get_unread_counts_by_pos_txn = (
         DataStore._get_unread_counts_by_pos_txn.__func__
     )
-    _get_state_group_for_events = (
-        StateStore.__dict__["_get_state_group_for_events"]
-    )
-    _get_state_group_for_event = (
-        StateStore.__dict__["_get_state_group_for_event"]
-    )
-    _get_state_groups_from_groups = (
-        StateStore.__dict__["_get_state_groups_from_groups"]
-    )
-    _get_state_groups_from_groups_txn = (
-        DataStore._get_state_groups_from_groups_txn.__func__
-    )
     get_recent_event_ids_for_room = (
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
-    get_current_state_ids = (
-        StateStore.__dict__["get_current_state_ids"]
-    )
-    get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
     _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
     has_room_changed_since = DataStore.has_room_changed_since.__func__
 
@@ -134,12 +116,6 @@ class SlavedEventStore(BaseSlavedStore):
         DataStore.get_room_events_stream_for_room.__func__
     )
     get_events_around = DataStore.get_events_around.__func__
-    get_state_for_event = DataStore.get_state_for_event.__func__
-    get_state_for_events = DataStore.get_state_for_events.__func__
-    get_state_groups = DataStore.get_state_groups.__func__
-    get_state_groups_ids = DataStore.get_state_groups_ids.__func__
-    get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__
-    get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__
     get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__
     get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__
     _get_joined_users_from_context = (
@@ -169,10 +145,7 @@ class SlavedEventStore(BaseSlavedStore):
     _get_rooms_for_user_where_membership_is_txn = (
         DataStore._get_rooms_for_user_where_membership_is_txn.__func__
     )
-    _get_state_for_groups = DataStore._get_state_for_groups.__func__
-    _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
     _get_events_around_txn = DataStore._get_events_around_txn.__func__
-    _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__
 
     get_backfill_events = DataStore.get_backfill_events.__func__
     _get_backfill_events = DataStore._get_backfill_events.__func__
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d03e79b85..786c3fe864 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -216,11 +216,12 @@ class ReplicationStreamer(object):
             self.federation_sender.federation_ack(token)
 
     @measure_func("repl.on_user_sync")
+    @defer.inlineCallbacks
     def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
         """A client has started/stopped syncing on a worker.
         """
         user_sync_counter.inc()
-        self.presence_handler.update_external_syncs_row(
+        yield self.presence_handler.update_external_syncs_row(
             conn_id, user_id, is_syncing, last_sync_ms,
         )
 
@@ -244,11 +245,12 @@ class ReplicationStreamer(object):
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
     @measure_func("repl.on_user_ip")
+    @defer.inlineCallbacks
     def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
         """The client saw a user request
         """
         user_ip_cache_counter.inc()
-        self.store.insert_client_ip(
+        yield self.store.insert_client_ip(
             user_id, access_token, ip, user_agent, device_id, last_seen,
         )