diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 3d8b4d5d5b..307660b99a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -27,7 +28,7 @@ import types
logger = logging.getLogger(__name__)
-class PusherStore(SQLBaseStore):
+class PusherWorkerStore(SQLBaseStore):
def _decode_pushers_rows(self, rows):
for r in rows:
dataJson = r['data']
@@ -102,9 +103,6 @@ class PusherStore(SQLBaseStore):
rows = yield self.runInteraction("get_all_pushers", get_pushers)
defer.returnValue(rows)
- def get_pushers_stream_token(self):
- return self._pushers_id_gen.get_current_token()
-
def get_all_updated_pushers(self, last_id, current_id, limit):
if last_id == current_id:
return defer.succeed(([], []))
@@ -198,6 +196,11 @@ class PusherStore(SQLBaseStore):
defer.returnValue(result)
+
+class PusherStore(PusherWorkerStore):
+ def get_pushers_stream_token(self):
+ return self._pushers_id_gen.get_current_token()
+
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name,
@@ -230,14 +233,18 @@ class PusherStore(SQLBaseStore):
)
if newly_inserted:
- # get_if_user_has_pusher only cares if the user has
- # at least *one* pusher.
- self.get_if_user_has_pusher.invalidate(user_id,)
+ self.runInteraction(
+ "add_pusher",
+ self._invalidate_cache_and_stream,
+ self.get_if_user_has_pusher, (user_id,)
+ )
@defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id):
- txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_if_user_has_pusher, (user_id,)
+ )
self._simple_delete_one_txn(
txn,
|