diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index a96f0f0183..5fca3bd772 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -24,90 +24,127 @@ import logging
logger = logging.getLogger(__name__)
+
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
- def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+ def __init__(self, _hs, user_name, app_id, app_instance_id,
+ app_display_name, device_display_name, pushkey, data,
last_token, last_success, failing_since):
self.hs = _hs
self.evStreamHandler = self.hs.get_handlers().event_stream_handler
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.user_name = user_name
- self.app = app
+ self.app_id = app_id
+ self.app_instance_id = app_instance_id
self.app_display_name = app_display_name
self.device_display_name = device_display_name
self.pushkey = pushkey
self.data = data
self.last_token = last_token
+ self.last_success = last_success # not actually used
self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.failing_since = None
+ self.failing_since = failing_since
@defer.inlineCallbacks
def start(self):
if not self.last_token:
- # First-time setup: get a token to start from (we can't just start from no token, ie. 'now'
- # because we need the result to be reproduceable in case we fail to dispatch the push)
+ # First-time setup: get a token to start from (we can't
+ # just start from no token, ie. 'now'
+ # because we need the result to be reproduceable in case
+ # we fail to dispatch the push)
config = PaginationConfig(from_token=None, limit='1')
- chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=0)
+ chunk = yield self.evStreamHandler.get_stream(
+ self.user_name, config, timeout=0)
self.last_token = chunk['end']
- self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+ self.store.update_pusher_last_token(
+ self.user_name, self.pushkey, self.last_token)
logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token)
while True:
from_tok = StreamToken.from_string(self.last_token)
config = PaginationConfig(from_token=from_tok, limit='1')
- chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
+ chunk = yield self.evStreamHandler.get_stream(
+ self.user_name, config, timeout=100*365*24*60*60*1000)
- # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event
- singleEvent = None
+ # limiting to 1 may get 1 event plus 1 presence event, so
+ # pick out the actual event
+ single_event = None
for c in chunk['chunk']:
- if 'event_id' in c: # Hmmm...
- singleEvent = c
+ if 'event_id' in c: # Hmmm...
+ single_event = c
break
- if not singleEvent:
+ if not single_event:
continue
- ret = yield self.dispatchPush(singleEvent)
- if (ret):
+ ret = yield self.dispatch_push(single_event)
+ if ret:
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
- self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey,
- self.last_token, self.clock.time_msec())
+ self.store.update_pusher_last_token_and_success(
+ self.user_name,
+ self.pushkey,
+ self.last_token,
+ self.clock.time_msec()
+ )
if self.failing_since:
self.failing_since = None
- self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+ self.store.update_pusher_failing_since(
+ self.user_name,
+ self.pushkey,
+ self.failing_since)
else:
if not self.failing_since:
self.failing_since = self.clock.time_msec()
- self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+ self.store.update_pusher_failing_since(
+ self.user_name,
+ self.pushkey,
+ self.failing_since
+ )
- if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
- # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load
+ if self.failing_since and \
+ self.failing_since < \
+ self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
+ # we really only give up so that if the URL gets
+ # fixed, we don't suddenly deliver a load
# of old notifications.
- logger.warn("Giving up on a notification to user %s, pushkey %s",
+ logger.warn("Giving up on a notification to user %s, "
+ "pushkey %s",
self.user_name, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
- self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+ self.store.update_pusher_last_token(
+ self.user_name,
+ self.pushkey,
+ self.last_token
+ )
self.failing_since = None
- self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+ self.store.update_pusher_failing_since(
+ self.user_name,
+ self.pushkey,
+ self.failing_since
+ )
else:
- logger.warn("Failed to dispatch push for user %s (failing for %dms)."
+ logger.warn("Failed to dispatch push for user %s "
+ "(failing for %dms)."
"Trying again in %dms",
- self.user_name,
- self.clock.time_msec() - self.failing_since,
- self.backoff_delay
- )
+ self.user_name,
+ self.clock.time_msec() - self.failing_since,
+ self.backoff_delay
+ )
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
- self.backoff_delay *=2
+ self.backoff_delay *= 2
if self.backoff_delay > Pusher.MAX_BACKOFF:
self.backoff_delay = Pusher.MAX_BACKOFF
+ def dispatch_push(self, p):
+ pass
+
class PusherConfigException(Exception):
def __init__(self, msg):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 33d735b974..fd7fe4e39c 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,21 +22,28 @@ import logging
logger = logging.getLogger(__name__)
+
class HttpPusher(Pusher):
- def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+ def __init__(self, _hs, user_name, app_id, app_instance_id,
+ app_display_name, device_display_name, pushkey, data,
last_token, last_success, failing_since):
- super(HttpPusher, self).__init__(_hs,
- user_name,
- app,
- app_display_name,
- device_display_name,
- pushkey,
- data,
- last_token,
- last_success,
- failing_since)
+ super(HttpPusher, self).__init__(
+ _hs,
+ user_name,
+ app_id,
+ app_instance_id,
+ app_display_name,
+ device_display_name,
+ pushkey,
+ data,
+ last_token,
+ last_success,
+ failing_since
+ )
if 'url' not in data:
- raise PusherConfigException("'url' required in data for HTTP pusher")
+ raise PusherConfigException(
+ "'url' required in data for HTTP pusher"
+ )
self.url = data['url']
self.httpCli = SimpleHttpClient(self.hs)
self.data_minus_url = {}
@@ -53,34 +60,36 @@ class HttpPusher(Pusher):
return None
return {
- 'notification': {
- 'transition' : 'new', # everything is new for now: we don't have read receipts
- 'id': event['event_id'],
- 'type': event['type'],
- 'from': event['user_id'],
- # we may have to fetch this over federation and we can't trust it anyway: is it worth it?
- #'fromDisplayName': 'Steve Stevington'
- },
- #'counts': { -- we don't mark messages as read yet so we have no way of knowing
- # 'unread': 1,
- # 'missedCalls': 2
- # },
- 'devices': {
- self.pushkey: {
- 'data' : self.data_minus_url
+ 'notification': {
+ 'transition': 'new',
+ # everything is new for now: we don't have read receipts
+ 'id': event['event_id'],
+ 'type': event['type'],
+ 'from': event['user_id'],
+ # we may have to fetch this over federation and we
+ # can't trust it anyway: is it worth it?
+ #'fromDisplayName': 'Steve Stevington'
+ },
+ #'counts': { -- we don't mark messages as read yet so
+ # we have no way of knowing
+ # 'unread': 1,
+ # 'missedCalls': 2
+ # },
+ 'devices': {
+ self.pushkey: {
+ 'data': self.data_minus_url
}
- }
+ }
}
@defer.inlineCallbacks
- def dispatchPush(self, event):
- notificationDict = self._build_notification_dict(event)
- if not notificationDict:
+ def dispatch_push(self, event):
+ notification_dict = self._build_notification_dict(event)
+ if not notification_dict:
defer.returnValue(True)
try:
- yield self.httpCli.post_json_get_json(self.url, notificationDict)
+ yield self.httpCli.post_json_get_json(self.url, notification_dict)
except:
logger.exception("Failed to push %s ", self.url)
defer.returnValue(False)
defer.returnValue(True)
-
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 3fa5a4c4ff..045c36f3b7 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -34,13 +34,17 @@ class PusherPool:
def start(self):
self._pushers_added()
- def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
- # we try to create the pusher just to validate the config: it will then get pulled out of the database,
- # recreated, added and started: this means we have only one code path adding pushers.
+ def add_pusher(self, user_name, kind, app_id, app_instance_id,
+ app_display_name, device_display_name, pushkey, data):
+ # we try to create the pusher just to validate the config: it
+ # will then get pulled out of the database,
+ # recreated, added and started: this means we have only one
+ # code path adding pushers.
self._create_pusher({
"user_name": user_name,
"kind": kind,
- "app": app,
+ "app_id": app_id,
+ "app_instance_id": app_instance_id,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"pushkey": pushkey,
@@ -49,42 +53,55 @@ class PusherPool:
"last_success": None,
"failing_since": None
})
- self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data)
+ self._add_pusher_to_store(user_name, kind, app_id, app_instance_id,
+ app_display_name, device_display_name,
+ pushkey, data)
@defer.inlineCallbacks
- def _add_pusher_to_store(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
- yield self.store.add_pusher(user_name=user_name,
- kind=kind,
- app=app,
- app_display_name=app_display_name,
- device_display_name=device_display_name,
- pushkey=pushkey,
- data=json.dumps(data))
+ def _add_pusher_to_store(self, user_name, kind, app_id, app_instance_id,
+ app_display_name, device_display_name,
+ pushkey, data):
+ yield self.store.add_pusher(
+ user_name=user_name,
+ kind=kind,
+ app_id=app_id,
+ app_instance_id=app_instance_id,
+ app_display_name=app_display_name,
+ device_display_name=device_display_name,
+ pushkey=pushkey,
+ data=json.dumps(data)
+ )
self._pushers_added()
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
- return HttpPusher(self.hs,
- user_name=pusherdict['user_name'],
- app=pusherdict['app'],
- app_display_name=pusherdict['app_display_name'],
- device_display_name=pusherdict['device_display_name'],
- pushkey=pusherdict['pushkey'],
- data=pusherdict['data'],
- last_token=pusherdict['last_token'],
- last_success=pusherdict['last_success'],
- failing_since=pusherdict['failing_since']
- )
+ return HttpPusher(
+ self.hs,
+ user_name=pusherdict['user_name'],
+ app_id=pusherdict['app_id'],
+ app_instance_id=pusherdict['app_instance_id'],
+ app_display_name=pusherdict['app_display_name'],
+ device_display_name=pusherdict['device_display_name'],
+ pushkey=pusherdict['pushkey'],
+ data=pusherdict['data'],
+ last_token=pusherdict['last_token'],
+ last_success=pusherdict['last_success'],
+ failing_since=pusherdict['failing_since']
+ )
else:
- raise PusherConfigException("Unknown pusher type '%s' for user %s" %
- (pusherdict['kind'], pusherdict['user_name']))
+ raise PusherConfigException(
+ "Unknown pusher type '%s' for user %s" %
+ (pusherdict['kind'], pusherdict['user_name'])
+ )
@defer.inlineCallbacks
def _pushers_added(self):
- pushers = yield self.store.get_all_pushers_after_id(self.last_pusher_started)
+ pushers = yield self.store.get_all_pushers_after_id(
+ self.last_pusher_started
+ )
for p in pushers:
p['data'] = json.loads(p['data'])
- if (len(pushers)):
+ if len(pushers):
self.last_pusher_started = pushers[-1]['id']
self._start_pushers(pushers)
@@ -95,4 +112,4 @@ class PusherPool:
p = self._create_pusher(pusherdict)
if p:
self.pushers.append(p)
- p.start()
\ No newline at end of file
+ p.start()
diff --git a/synapse/rest/pusher.py b/synapse/rest/pusher.py
index 85d0d1c8cd..a39341cd8b 100644
--- a/synapse/rest/pusher.py
+++ b/synapse/rest/pusher.py
@@ -31,30 +31,37 @@ class PusherRestServlet(RestServlet):
content = _parse_json(request)
- reqd = ['kind', 'app', 'app_display_name', 'device_display_name', 'data']
+ reqd = ['kind', 'app_id', 'app_instance_id', 'app_display_name',
+ 'device_display_name', 'data']
missing = []
for i in reqd:
if i not in content:
missing.append(i)
if len(missing):
- raise SynapseError(400, "Missing parameters: "+','.join(missing), errcode=Codes.MISSING_PARAM)
+ raise SynapseError(400, "Missing parameters: "+','.join(missing),
+ errcode=Codes.MISSING_PARAM)
pusher_pool = self.hs.get_pusherpool()
try:
- pusher_pool.add_pusher(user_name=user.to_string(),
- kind=content['kind'],
- app=content['app'],
- app_display_name=content['app_display_name'],
- device_display_name=content['device_display_name'],
- pushkey=pushkey,
- data=content['data'])
+ pusher_pool.add_pusher(
+ user_name=user.to_string(),
+ kind=content['kind'],
+ app_id=content['app_id'],
+ app_instance_id=content['app_instance_id'],
+ app_display_name=content['app_display_name'],
+ device_display_name=content['device_display_name'],
+ pushkey=pushkey,
+ data=content['data']
+ )
except PusherConfigException as pce:
- raise SynapseError(400, "Config Error: "+pce.message, errcode=Codes.MISSING_PARAM)
+ raise SynapseError(400, "Config Error: "+pce.message,
+ errcode=Codes.MISSING_PARAM)
defer.returnValue((200, {}))
- def on_OPTIONS(self, request):
- return (200, {})
+ def on_OPTIONS(self, _):
+ return 200, {}
+
# XXX: C+ped from rest/room.py - surely this should be common?
def _parse_json(request):
@@ -67,5 +74,6 @@ def _parse_json(request):
except ValueError:
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
+
def register_servlets(hs, http_server):
PusherRestServlet(hs).register(http_server)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index ce158c4b18..a858e46f3b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -25,11 +25,13 @@ import logging
logger = logging.getLogger(__name__)
+
class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def get_all_pushers_after_id(self, min_id):
sql = (
- "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, "
+ "SELECT id, user_name, kind, app_id, app_instance_id,"
+ "app_display_name, device_display_name, pushkey, data, "
"last_token, last_success, failing_since "
"FROM pushers "
"WHERE id > ?"
@@ -42,14 +44,15 @@ class PusherStore(SQLBaseStore):
"id": r[0],
"user_name": r[1],
"kind": r[2],
- "app": r[3],
- "app_display_name": r[4],
- "device_display_name": r[5],
- "pushkey": r[6],
- "data": r[7],
- "last_token": r[8],
- "last_success": r[9],
- "failing_since": r[10]
+ "app_id": r[3],
+ "app_instance_id": r[4],
+ "app_display_name": r[5],
+ "device_display_name": r[6],
+ "pushkey": r[7],
+ "data": r[8],
+ "last_token": r[9],
+ "last_success": r[10],
+ "failing_since": r[11]
}
for r in rows
]
@@ -57,12 +60,14 @@ class PusherStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
- def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
+ def add_pusher(self, user_name, kind, app_id, app_instance_id,
+ app_display_name, device_display_name, pushkey, data):
try:
yield self._simple_insert(PushersTable.table_name, dict(
user_name=user_name,
kind=kind,
- app=app,
+ app_id=app_id,
+ app_instance_id=app_instance_id,
app_display_name=app_display_name,
device_display_name=device_display_name,
pushkey=pushkey,
@@ -76,23 +81,27 @@ class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def update_pusher_last_token(self, user_name, pushkey, last_token):
- yield self._simple_update_one(PushersTable.table_name,
- {'user_name': user_name, 'pushkey': pushkey},
- {'last_token': last_token}
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'last_token': last_token}
)
@defer.inlineCallbacks
- def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success):
- yield self._simple_update_one(PushersTable.table_name,
- {'user_name': user_name, 'pushkey': pushkey},
- {'last_token': last_token, 'last_success': last_success}
+ def update_pusher_last_token_and_success(self, user_name, pushkey,
+ last_token, last_success):
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'last_token': last_token, 'last_success': last_success}
)
@defer.inlineCallbacks
def update_pusher_failing_since(self, user_name, pushkey, failing_since):
- yield self._simple_update_one(PushersTable.table_name,
- {'user_name': user_name, 'pushkey': pushkey},
- {'failing_since': failing_since}
+ yield self._simple_update_one(
+ PushersTable.table_name,
+ {'user_name': user_name, 'pushkey': pushkey},
+ {'failing_since': failing_since}
)
@@ -103,7 +112,8 @@ class PushersTable(Table):
"id",
"user_name",
"kind",
- "app"
+ "app_id",
+ "app_instance_id",
"app_display_name",
"device_display_name",
"pushkey",
diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
index e83f7e7436..b60aeda756 100644
--- a/synapse/storage/schema/delta/v7.sql
+++ b/synapse/storage/schema/delta/v7.sql
@@ -17,11 +17,12 @@ CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
- app varchar(64) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_instance_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
- data text,
+ data blob,
last_token TEXT,
last_success BIGINT,
failing_since BIGINT,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index e83f7e7436..b60aeda756 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -17,11 +17,12 @@ CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
- app varchar(64) NOT NULL,
+ app_id varchar(64) NOT NULL,
+ app_instance_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
- data text,
+ data blob,
last_token TEXT,
last_success BIGINT,
failing_since BIGINT,
|