From 7e2c89a37f3a5261f43b4d472b36219ac41dfb16 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 6 Apr 2016 15:42:15 +0100 Subject: Make pushers use the event_push_actions table instead of listening on an event stream & running the rules again. Sytest passes, but remaining to do: * Make badges work again * Remove old, unused code --- synapse/storage/schema/delta/31/pushers.py | 75 ++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 synapse/storage/schema/delta/31/pushers.py (limited to 'synapse/storage/schema/delta') diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py new file mode 100644 index 0000000000..7e0e385fb5 --- /dev/null +++ b/synapse/storage/schema/delta/31/pushers.py @@ -0,0 +1,75 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Change the last_token to last_stream_ordering now that pushers no longer +# listen on an event stream but instead select out of the event_push_actions +# table. + + +import logging + +logger = logging.getLogger(__name__) + + +def token_to_stream_ordering(token): + return int(token[1:].split('_')[0]) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + logger.info("Porting pushers table, delta 31...") + cur.execute(""" + CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data TEXT, + last_stream_ordering INTEGER, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) + ) + """) + cur.execute("""SELECT + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + FROM pushers + """) + count = 0 + for row in cur.fetchall(): + row = list(row) + row[12] = token_to_stream_ordering(row[12]) + cur.execute(database_engine.convert_param_style(""" + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_stream_ordering, last_success, + failing_since + ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))), + row + ) + count += 1 + cur.execute("DROP TABLE pushers") + cur.execute("ALTER TABLE pushers2 RENAME TO pushers") + logger.info("Moved %d pushers to new table", count) -- cgit 1.4.1 From 2d5c693fd3c72800980f906b8255e3619ac524e2 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 16:43:54 +0100 Subject: Fix port script for changes merged from develop --- synapse/storage/schema/delta/31/pushers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage/schema/delta') diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py index 7e0e385fb5..d07bab012f 100644 --- a/synapse/storage/schema/delta/31/pushers.py +++ b/synapse/storage/schema/delta/31/pushers.py @@ -27,7 +27,7 @@ def token_to_stream_ordering(token): return int(token[1:].split('_')[0]) -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_create(cur, database_engine, *args, **kwargs): logger.info("Porting pushers table, delta 31...") cur.execute(""" CREATE TABLE IF NOT EXISTS pushers2 ( @@ -73,3 +73,6 @@ def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute("DROP TABLE pushers") cur.execute("ALTER TABLE pushers2 RENAME TO pushers") logger.info("Moved %d pushers to new table", count) + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass -- cgit 1.4.1 From 05d044aac396de9dff64ffb47e8b9a3f43ad0919 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 7 Apr 2016 16:45:38 +0100 Subject: pep8 --- synapse/storage/schema/delta/31/pushers.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage/schema/delta') diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py index d07bab012f..93367fa09e 100644 --- a/synapse/storage/schema/delta/31/pushers.py +++ b/synapse/storage/schema/delta/31/pushers.py @@ -74,5 +74,6 @@ def run_create(cur, database_engine, *args, **kwargs): cur.execute("ALTER TABLE pushers2 RENAME TO pushers") logger.info("Moved %d pushers to new table", count) + def run_upgrade(cur, database_engine, *args, **kwargs): pass -- cgit 1.4.1