diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index feb9d228ae..ffb7d4a25b 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
dict(txn_id=txn_id, as_id=service.id)
)
+ @defer.inlineCallbacks
def get_oldest_unsent_txn(self, service):
"""Get the oldest transaction which has not been sent for this
service.
@@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
A Deferred which resolves to an AppServiceTransaction or
None.
"""
- return self.runInteraction(
+ entry = yield self.runInteraction(
"get_oldest_unsent_appservice_txn",
self._get_oldest_unsent_txn,
service
)
+ if not entry:
+ defer.returnValue(None)
+
+ event_ids = json.loads(entry["event_ids"])
+
+ events = yield self.get_events(event_ids)
+
+ defer.returnValue(AppServiceTransaction(
+ service=service, id=entry["txn_id"], events=events
+ ))
+
def _get_oldest_unsent_txn(self, txn, service):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
@@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
entry = rows[0]
- event_ids = json.loads(entry["event_ids"])
- events = self._get_events_txn(txn, event_ids)
-
- return AppServiceTransaction(
- service=service, id=entry["txn_id"], events=events
- )
+ return entry
def _get_last_txn(self, txn, service_id):
txn.execute(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2b3f79577b..b710505a7e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -762,41 +762,6 @@ class EventsStore(SQLBaseStore):
if e_id in event_map and event_map[e_id]
])
- def _get_events_txn(self, txn, event_ids, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not event_ids:
- return []
-
- event_map = self._get_events_from_cache(
- event_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- missing_events_ids = [e for e in event_ids if e not in event_map]
-
- if not missing_events_ids:
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
-
- missing_events = self._fetch_events_txn(
- txn,
- missing_events_ids,
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- event_map.update(missing_events)
-
- return [
- event_map[e_id] for e_id in event_ids
- if e_id in event_map and event_map[e_id]
- ]
-
def _invalidate_get_event_cache(self, event_id):
for check_redacted in (False, True):
for get_prev_content in (False, True):
@@ -804,18 +769,6 @@ class EventsStore(SQLBaseStore):
(event_id, check_redacted, get_prev_content)
)
- def _get_event_txn(self, txn, event_id, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
-
- events = self._get_events_txn(
- txn, [event_id],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- allow_rejected=allow_rejected,
- )
-
- return events[0] if events else None
-
def _get_events_from_cache(self, events, check_redacted, get_prev_content,
allow_rejected):
event_map = {}
@@ -981,34 +934,6 @@ class EventsStore(SQLBaseStore):
return rows
- def _fetch_events_txn(self, txn, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not events:
- return {}
-
- rows = self._fetch_event_rows(
- txn, events,
- )
-
- if not allow_rejected:
- rows[:] = [r for r in rows if not r["rejects"]]
-
- res = [
- self._get_event_from_row_txn(
- txn,
- row["internal_metadata"], row["json"], row["redacts"],
- check_redacted=check_redacted,
- get_prev_content=get_prev_content,
- rejected_reason=row["rejects"],
- )
- for row in rows
- ]
-
- return {
- r.event_id: r
- for r in res
- }
-
@defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted,
check_redacted=True, get_prev_content=False,
@@ -1070,69 +995,6 @@ class EventsStore(SQLBaseStore):
defer.returnValue(ev)
- def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
- check_redacted=True, get_prev_content=False,
- rejected_reason=None):
- d = json.loads(js)
- internal_metadata = json.loads(internal_metadata)
-
- if rejected_reason:
- rejected_reason = self._simple_select_one_onecol_txn(
- txn,
- table="rejections",
- keyvalues={"event_id": rejected_reason},
- retcol="reason",
- )
-
- ev = FrozenEvent(
- d,
- internal_metadata_dict=internal_metadata,
- rejected_reason=rejected_reason,
- )
-
- if check_redacted and redacted:
- ev = prune_event(ev)
-
- redaction_id = self._simple_select_one_onecol_txn(
- txn,
- table="redactions",
- keyvalues={"redacts": ev.event_id},
- retcol="event_id",
- )
-
- ev.unsigned["redacted_by"] = redaction_id
- # Get the redaction event.
-
- because = self._get_event_txn(
- txn,
- redaction_id,
- check_redacted=False
- )
-
- if because:
- ev.unsigned["redacted_because"] = because
-
- if get_prev_content and "replaces_state" in ev.unsigned:
- prev = self._get_event_txn(
- txn,
- ev.unsigned["replaces_state"],
- get_prev_content=False,
- )
- if prev:
- ev.unsigned["prev_content"] = prev.content
- ev.unsigned["prev_sender"] = prev.sender
-
- self._get_event_cache.prefill(
- (ev.event_id, check_redacted, get_prev_content), ev
- )
-
- return ev
-
- def _parse_events_txn(self, txn, rows):
- event_ids = [r["event_id"] for r in rows]
-
- return self._get_events_txn(txn, event_ids)
-
@defer.inlineCallbacks
def count_daily_messages(self):
"""
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 26933e593a..97f9f1929c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore):
@cachedInlineCallbacks()
def get_room_name_and_aliases(self, room_id):
- def f(txn):
+ def get_room_name(txn):
sql = (
- "SELECT event_id FROM current_state_events "
- "WHERE room_id = ? "
+ "SELECT name FROM room_names"
+ " INNER JOIN current_state_events USING (room_id, event_id)"
+ " WHERE room_id = ?"
+ " LIMIT 1"
)
- sql += " AND ((type = 'm.room.name' AND state_key = '')"
- sql += " OR type = 'm.room.aliases')"
-
txn.execute(sql, (room_id,))
- results = self.cursor_to_dict(txn)
+ rows = txn.fetchall()
+ if rows:
+ return rows[0][0]
+ else:
+ return None
- return self._parse_events_txn(txn, results)
+ return [row[0] for row in txn.fetchall()]
- events = yield self.runInteraction("get_room_name_and_aliases", f)
+ def get_room_aliases(txn):
+ sql = (
+ "SELECT content FROM current_state_events"
+ " INNER JOIN events USING (room_id, event_id)"
+ " WHERE room_id = ?"
+ )
+ txn.execute(sql, (room_id,))
+ return [row[0] for row in txn.fetchall()]
+
+ name = yield self.runInteraction("get_room_name", get_room_name)
+ alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
- name = None
aliases = []
- for e in events:
- if e.type == 'm.room.name':
- if 'name' in e.content:
- name = e.content['name']
- elif e.type == 'm.room.aliases':
- if 'aliases' in e.content:
- aliases.extend(e.content['aliases'])
+ for c in alias_contents:
+ try:
+ content = json.loads(c)
+ except:
+ continue
+
+ aliases.extend(content.get('aliases', []))
defer.returnValue((name, aliases))
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 0224299625..12941d1775 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
import re
+import ujson as json
logger = logging.getLogger(__name__)
@@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
- "SELECT stream_ordering, event_id FROM events"
+ "SELECT stream_ordering, event_id, room_id, type, content FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore):
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
- rows = txn.fetchall()
+ rows = self.cursor_to_dict(txn)
if not rows:
return 0
- min_stream_id = rows[-1][0]
- event_ids = [row[1] for row in rows]
-
- events = self._get_events_txn(txn, event_ids)
+ min_stream_id = rows[-1]["stream_ordering"]
event_search_rows = []
- for event in events:
+ for row in rows:
try:
- event_id = event.event_id
- room_id = event.room_id
- content = event.content
- if event.type == "m.room.message":
+ event_id = row["event_id"]
+ room_id = row["room_id"]
+ etype = row["type"]
+ try:
+ content = json.loads(row["content"])
+ except:
+ continue
+
+ if etype == "m.room.message":
key = "content.body"
value = content["body"]
- elif event.type == "m.room.topic":
+ elif etype == "m.room.topic":
key = "content.topic"
value = content["topic"]
- elif event.type == "m.room.name":
+ elif etype == "m.room.name":
key = "content.name"
value = content["name"]
except (KeyError, AttributeError):
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 95b12559a6..b9ad965fd6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore):
return True
return False
- ret = self._get_events_txn(
- txn,
- # apply the filter on the room id list
- [
- r["event_id"] for r in rows
- if app_service_interested(r)
- ],
- get_prev_content=True
- )
+ return [r for r in rows if app_service_interested(r)]
- self._set_before_and_after(ret, rows)
+ rows = yield self.runInteraction("get_appservice_room_stream", f)
- if rows:
- key = "s%d" % max(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = to_key
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
- return ret, key
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
- results = yield self.runInteraction("get_appservice_room_stream", f)
- defer.returnValue(results)
+ if rows:
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ defer.returnValue((ret, key))
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
|