diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 01f8339825..d0350ee5fe 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.types import RoomStreamToken
-from .stream import lower_bound
import logging
-import ujson as json
+import simplejson as json
+
+from six import iteritems
logger = logging.getLogger(__name__)
@@ -99,7 +99,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id):
sql = (
- "SELECT stream_ordering, topological_ordering"
+ "SELECT stream_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
@@ -111,17 +111,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0]
- topological_ordering = results[0][1]
return self._get_unread_counts_by_pos_txn(
- txn, room_id, user_id, topological_ordering, stream_ordering
+ txn, room_id, user_id, stream_ordering
)
- def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
- stream_ordering):
- token = RoomStreamToken(
- topological_ordering, stream_ordering
- )
+ def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
@@ -132,10 +127,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" WHERE"
" user_id = ?"
" AND room_id = ?"
- " AND %s"
- ) % (lower_bound(token, self.database_engine, inclusive=False),)
+ " AND stream_ordering > ?"
+ )
- txn.execute(sql, (user_id, room_id))
+ txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
notify_count = row[0] if row else 0
@@ -155,10 +150,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
- " AND %s"
- ) % (lower_bound(token, self.database_engine, inclusive=False),)
+ " AND stream_ordering > ?"
+ )
- txn.execute(sql, (user_id, room_id))
+ txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
highlight_count = row[0] if row else 0
@@ -209,7 +204,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" ep.highlight "
" FROM ("
" SELECT room_id,"
- " MAX(topological_ordering) as topological_ordering,"
" MAX(stream_ordering) as stream_ordering"
" FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)"
@@ -219,13 +213,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" event_push_actions AS ep"
" WHERE"
" ep.room_id = rl.room_id"
- " AND ("
- " ep.topological_ordering > rl.topological_ordering"
- " OR ("
- " ep.topological_ordering = rl.topological_ordering"
- " AND ep.stream_ordering > rl.stream_ordering"
- " )"
- " )"
+ " AND ep.stream_ordering > rl.stream_ordering"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
@@ -318,7 +306,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" ep.highlight, e.received_ts"
" FROM ("
" SELECT room_id,"
- " MAX(topological_ordering) as topological_ordering,"
" MAX(stream_ordering) as stream_ordering"
" FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)"
@@ -329,13 +316,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
" ep.room_id = rl.room_id"
- " AND ("
- " ep.topological_ordering > rl.topological_ordering"
- " OR ("
- " ep.topological_ordering = rl.topological_ordering"
- " AND ep.stream_ordering > rl.stream_ordering"
- " )"
- " )"
+ " AND ep.stream_ordering > rl.stream_ordering"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
@@ -441,13 +422,14 @@ class EventPushActionsWorkerStore(SQLBaseStore):
txn.executemany(sql, (
_gen_entry(user_id, actions)
- for user_id, actions in user_id_actions.iteritems()
+ for user_id, actions in iteritems(user_id_actions)
))
return self.runInteraction(
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)
+ @defer.inlineCallbacks
def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB
@@ -456,13 +438,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
event_id (str)
"""
- return self._simple_delete(
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event_id,
- },
- desc="remove_push_actions_from_staging",
- )
+ try:
+ res = yield self._simple_delete(
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event_id,
+ },
+ desc="remove_push_actions_from_staging",
+ )
+ defer.returnValue(res)
+ except Exception:
+ # this method is called from an exception handler, so propagating
+ # another exception here really isn't helpful - there's nothing
+ # the caller can do about it. Just log the exception and move on.
+ logger.exception(
+ "Error removing push actions after event persistence failure",
+ )
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
@@ -752,10 +743,10 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
- topological_ordering, stream_ordering):
+ stream_ordering):
"""
Purges old push actions for a user and room before a given
- topological_ordering.
+ stream_ordering.
We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.
@@ -764,7 +755,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn: The transcation
room_id: Room ID to delete from
user_id: user ID to delete for
- topological_ordering: The lowest topological ordering which will
+ stream_ordering: The lowest stream ordering which will
not be deleted.
"""
txn.call_after(
@@ -783,9 +774,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
- " topological_ordering <= ?"
+ " stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
- (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+ (user_id, room_id, stream_ordering, self.stream_ordering_month_ago)
)
txn.execute("""
|