diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_push_actions.py | 49 | ||||
-rw-r--r-- | synapse/storage/events.py | 2 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 27 | ||||
-rw-r--r-- | synapse/storage/schema/delta/31/events.sql | 16 | ||||
-rw-r--r-- | synapse/storage/schema/delta/31/pusher_throttle.sql | 23 |
5 files changed, 104 insertions, 13 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 86a98b6f11..6f316f7d24 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -118,16 +118,19 @@ class EventPushActionsStore(SQLBaseStore): max_stream_ordering=None): def get_after_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep, (" - " SELECT room_id, user_id," - " max(topological_ordering) as topological_ordering," - " max(stream_ordering) as stream_ordering" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " + "e.received_ts " + "FROM (" + " SELECT room_id, user_id, " + " max(topological_ordering) as topological_ordering, " + " max(stream_ordering) as stream_ordering " " FROM events" " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" " GROUP BY room_id, user_id" - ") AS rl " - "WHERE" + ") AS rl," + " event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" " ep.room_id = rl.room_id" " AND (" " ep.topological_ordering > rl.topological_ordering" @@ -153,11 +156,13 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( - "SELECT ep.event_id, ep.stream_ordering, ep.actions " - "FROM event_push_actions AS ep " - "WHERE ep.room_id not in (" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.room_id not in (" " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ? " + " WHERE receipt_type = 'm.read' AND user_id = ?" " GROUP BY room_id" ") AND ep.user_id = ? AND ep.stream_ordering > ?" ) @@ -175,12 +180,30 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue([ { "event_id": row[0], - "stream_ordering": row[1], - "actions": json.loads(row[2]), + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + "received_ts": row[4], } for row in after_read_receipt + no_read_receipt ]) @defer.inlineCallbacks + def get_time_of_last_push_action_before(self, stream_ordering): + def f(txn): + sql = ( + "SELECT e.received_ts" + " FROM event_push_actions AS ep" + " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" + " WHERE ep.stream_ordering > ?" + " ORDER BY ep.stream_ordering ASC" + " LIMIT 1" + ) + txn.execute(sql, (stream_ordering,)) + return txn.fetchone() + result = yield self.runInteraction("get_time_of_last_push_action_before", f) + defer.returnValue(result[0] if result else None) + + @defer.inlineCallbacks def get_latest_push_action_stream_ordering(self): def f(txn): txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions") diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a27b7919cd..4655669ba0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -144,6 +144,7 @@ class EventsStore(SQLBaseStore): def __init__(self, hs): super(EventsStore, self).__init__(hs) + self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) @@ -565,6 +566,7 @@ class EventsStore(SQLBaseStore): "outlier": event.internal_metadata.is_outlier(), "content": encode_json(event.content).decode("UTF-8"), "origin_server_ts": int(event.origin_server_ts), + "received_ts": self._clock.time_msec(), } for event, _ in events_and_contexts ], diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 11feb3eb11..d9afd7ec87 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -233,3 +233,30 @@ class PusherStore(SQLBaseStore): {'failing_since': failing_since}, desc="update_pusher_failing_since", ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self._simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room" + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"] + } + + defer.returnValue(params_by_room) + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + yield self._simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params" + ) diff --git a/synapse/storage/schema/delta/31/events.sql b/synapse/storage/schema/delta/31/events.sql new file mode 100644 index 0000000000..1dd0f9e170 --- /dev/null +++ b/synapse/storage/schema/delta/31/events.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN received_ts BIGINT; diff --git a/synapse/storage/schema/delta/31/pusher_throttle.sql b/synapse/storage/schema/delta/31/pusher_throttle.sql new file mode 100644 index 0000000000..d86d30c13c --- /dev/null +++ b/synapse/storage/schema/delta/31/pusher_throttle.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE pusher_throttle( + pusher BIGINT NOT NULL, + room_id TEXT NOT NULL, + last_sent_ts BIGINT, + throttle_ms BIGINT, + PRIMARY KEY (pusher, room_id) +); |