diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 81c3c94b2e..9201a377b6 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -36,7 +36,7 @@ from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
from .stream import StreamStore
-from .pdu import StatePduStore, PduStore
+from .pdu import StatePduStore, PduStore, PdusTable
from .transactions import TransactionStore
from .keys import KeyStore
@@ -47,6 +47,11 @@ import os
logger = logging.getLogger(__name__)
+class _RollbackButIsFineException(Exception):
+ """ This exception is used to rollback a transaction without implying
+ something went wrong.
+ """
+ pass
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
@@ -71,14 +76,16 @@ class DataStore(RoomMemberStore, RoomStore,
self.min_token -= 1
stream_ordering = self.min_token
- latest = yield self._db_pool.runInteraction(
- self._persist_pdu_event_txn,
- pdu=pdu,
- event=event,
- backfilled=backfilled,
- stream_ordering=stream_ordering,
- )
- defer.returnValue(latest)
+ try:
+ yield self._db_pool.runInteraction(
+ self._persist_pdu_event_txn,
+ pdu=pdu,
+ event=event,
+ backfilled=backfilled,
+ stream_ordering=stream_ordering,
+ )
+ except _RollbackButIsFineException as e:
+ pass
@defer.inlineCallbacks
def get_event(self, event_id, allow_none=False):
@@ -116,6 +123,12 @@ class DataStore(RoomMemberStore, RoomStore,
del cols["content"]
del cols["prev_pdus"]
cols["content_json"] = json.dumps(pdu.content)
+
+ unrec_keys.update({
+ k: v for k, v in cols.items()
+ if k not in PdusTable.fields
+ })
+
cols["unrecognized_keys"] = json.dumps(unrec_keys)
logger.debug("Persisting: %s", repr(cols))
@@ -175,11 +188,12 @@ class DataStore(RoomMemberStore, RoomStore,
try:
self._simple_insert_txn(txn, "events", vals)
except:
- logger.exception(
+ logger.warn(
"Failed to persist, probably duplicate: %s",
- event.event_id
+ event.event_id,
+ exc_info=True,
)
- return
+ raise _RollbackButIsFineException("_persist_event")
if not backfilled and hasattr(event, "state_key"):
vals = {
@@ -205,8 +219,6 @@ class DataStore(RoomMemberStore, RoomStore,
}
)
- return self._get_room_events_max_id_txn(txn)
-
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
sql = (
|