summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py7
-rw-r--r--synapse/storage/engines/sqlite3.py2
-rw-r--r--synapse/storage/event_federation.py2
-rw-r--r--synapse/storage/events.py6
-rw-r--r--synapse/storage/stream.py59
6 files changed, 42 insertions, 36 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index c91c7a3729..5a9e7720d9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -59,7 +59,7 @@ logger = logging.getLogger(__name__)
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
 # times give more inserts into the database even for readonly API hits
 # 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120*1000
+LAST_SEEN_GRANULARITY = 120 * 1000
 
 
 class DataStore(RoomMemberStore, RoomStore,
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 5e77320540..cfb87d9328 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -185,7 +185,7 @@ class SQLBaseStore(object):
             time_then = self._previous_loop_ts
             self._previous_loop_ts = time_now
 
-            ratio = (curr - prev)/(time_now - time_then)
+            ratio = (curr - prev) / (time_now - time_then)
 
             top_three_counters = self._txn_perf_counters.interval(
                 time_now - time_then, limit=3
@@ -643,7 +643,10 @@ class SQLBaseStore(object):
         if not iterable:
             defer.returnValue(results)
 
-        chunks = [iterable[i:i+batch_size] for i in xrange(0, len(iterable), batch_size)]
+        chunks = [
+            iterable[i:i + batch_size]
+            for i in xrange(0, len(iterable), batch_size)
+        ]
         for chunk in chunks:
             rows = yield self.runInteraction(
                 desc,
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 400c10103c..91fac33b8b 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -54,7 +54,7 @@ class Sqlite3Engine(object):
 
 def _parse_match_info(buf):
     bufsize = len(buf)
-    return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)]
+    return [struct.unpack('@I', buf[i:i + 4])[0] for i in range(0, bufsize, 4)]
 
 
 def _rank(raw_match_info):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5f32eec6f8..ce2c794025 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -58,7 +58,7 @@ class EventFederationStore(SQLBaseStore):
             new_front = set()
             front_list = list(front)
             chunks = [
-                front_list[x:x+100]
+                front_list[x:x + 100]
                 for x in xrange(0, len(front), 100)
             ]
             for chunk in chunks:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5e85552029..4d7cdd00d0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -84,7 +84,7 @@ class EventsStore(SQLBaseStore):
                 event.internal_metadata.stream_ordering = stream
 
             chunks = [
-                events_and_contexts[x:x+100]
+                events_and_contexts[x:x + 100]
                 for x in xrange(0, len(events_and_contexts), 100)
             ]
 
@@ -740,7 +740,7 @@ class EventsStore(SQLBaseStore):
         rows = []
         N = 200
         for i in range(1 + len(events) / N):
-            evs = events[i*N:(i + 1)*N]
+            evs = events[i * N:(i + 1) * N]
             if not evs:
                 break
 
@@ -755,7 +755,7 @@ class EventsStore(SQLBaseStore):
                 " LEFT JOIN rejections as rej USING (event_id)"
                 " LEFT JOIN redactions as r ON e.event_id = r.redacts"
                 " WHERE e.event_id IN (%s)"
-            ) % (",".join(["?"]*len(evs)),)
+            ) % (",".join(["?"] * len(evs)),)
 
             txn.execute(sql, evs)
             rows.extend(self.cursor_to_dict(txn))
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index a03458c2fc..2c49a5e499 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -168,7 +168,7 @@ class StreamStore(SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
             res = yield defer.gatherResults([
                 self.get_room_events_stream_for_room(
                     room_id, from_key, to_key, limit
@@ -220,28 +220,30 @@ class StreamStore(SQLBaseStore):
 
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
-            ret.reverse()
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            if rows:
-                key = "s%d" % min(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = from_key
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-            return ret, key
-        res = yield self.runInteraction("get_room_events_stream_for_room", f)
-        defer.returnValue(res)
+        ret.reverse()
 
-    def get_room_changes_for_user(self, user_id, from_key, to_key):
+        if rows:
+            key = "s%d" % min(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = from_key
+
+        defer.returnValue((ret, key))
+
+    @defer.inlineCallbacks
+    def get_membership_changes_for_user(self, user_id, from_key, to_key):
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
         else:
@@ -249,14 +251,14 @@ class StreamStore(SQLBaseStore):
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
-            return defer.succeed([])
+            defer.returnValue([])
 
         if from_id:
             has_changed = self._membership_stream_cache.has_entity_changed(
                 user_id, int(from_id)
             )
             if not has_changed:
-                return defer.succeed([])
+                defer.returnValue([])
 
         def f(txn):
             if from_id is not None:
@@ -281,17 +283,18 @@ class StreamStore(SQLBaseStore):
                 txn.execute(sql, (user_id, to_id,))
             rows = self.cursor_to_dict(txn)
 
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows
+
+        rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
-            self._set_before_and_after(ret, rows, topo_order=False)
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            return ret
+        self._set_before_and_after(ret, rows, topo_order=False)
 
-        return self.runInteraction("get_room_changes_for_user", f)
+        defer.returnValue(ret)
 
     def get_room_events_stream(
         self,