summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py22
-rw-r--r--synapse/storage/pdu.py16
2 files changed, 26 insertions, 12 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index b846081d49..2243a710db 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -20,6 +20,8 @@ from synapse.api.events.room import (
     RoomConfigEvent, RoomNameEvent,
 )
 
+from synapse.util.logutils import log_function
+
 from .directory import DirectoryStore
 from .feedback import FeedbackStore
 from .presence import PresenceStore
@@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore
 from .transactions import TransactionStore
 
 import json
+import logging
 import os
 
 
+logger = logging.getLogger(__name__)
+
+
 class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, PduStore, StatePduStore, TransactionStore,
@@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore,
         self.min_token = None
 
     @defer.inlineCallbacks
+    @log_function
     def persist_event(self, event, backfilled=False):
         if event.type == RoomMemberEvent.TYPE:
             yield self._store_room_member(event)
@@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore,
         defer.returnValue(event)
 
     @defer.inlineCallbacks
+    @log_function
     def _store_event(self, event, backfilled):
         # FIXME (erikj): This should be removed when we start amalgamating
         # event and pdu storage
@@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore,
             if not self.min_token_deferred.called:
                 yield self.min_token_deferred
             self.min_token -= 1
-            vals["token_ordering"] = self.min_token
+            vals["stream_ordering"] = self.min_token
 
         unrec = {
             k: v
@@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore,
         }
         vals["unrecognized_keys"] = json.dumps(unrec)
 
-        yield self._simple_insert("events", vals)
+        try:
+            yield self._simple_insert("events", vals)
+        except:
+            logger.exception("Failed to persist, probably duplicate")
+            return
 
         if not backfilled and hasattr(event, "state_key"):
             vals = {
@@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore,
     def _get_min_token(self):
         row = yield self._execute(
             None,
-            "SELECT MIN(token_ordering) FROM events"
+            "SELECT MIN(stream_ordering) FROM events"
         )
 
-        self.min_token = rows[0][0] if rows and rows[0] else 0
+        self.min_token = min(row[0][0], -1) if row and row[0] else -1
+
+        logger.debug("min_token is: %s", self.min_token)
 
         defer.returnValue(self.min_token)
 
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index a24ce7ab78..7655f43ede 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
 from ._base import SQLBaseStore, Table, JoinHelper
 
 from synapse.util.logutils import log_function
@@ -319,6 +321,7 @@ class PduStore(SQLBaseStore):
 
         return [(row[0], row[1], row[2]) for row in results]
 
+    @defer.inlineCallbacks
     def get_oldest_pdus_in_context(self, context):
         """Get a list of Pdus that we haven't backfilled beyond yet (and haven't    
         seen). This list is used when we want to backfill backwards and is the 
@@ -331,17 +334,14 @@ class PduStore(SQLBaseStore):
         Returns:
             list: A list of PduIdTuple.
         """
-        return self._db_pool.runInteraction(
-            self._get_oldest_pdus_in_context, context
-        )
-
-    def _get_oldest_pdus_in_context(self, txn, context):
-        txn.execute(
+        results = yield self._execute(
+            None,
             "SELECT pdu_id, origin FROM %(back)s WHERE context = ?"
             % {"back": PduBackwardExtremitiesTable.table_name, },
-            (context,)
+            context
         )
-        return [PduIdTuple(i, o) for i, o in txn.fetchall()]
+
+        defer.returnValue([PduIdTuple(i, o) for i, o in results])
 
     def is_pdu_new(self, pdu_id, origin, context, depth):
         """For a given Pdu, try and figure out if it's 'new', i.e., if it's