summary refs log tree commit diff
path: root/synapse/storage/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/pusher.py')
-rw-r--r--synapse/storage/pusher.py54
1 files changed, 41 insertions, 13 deletions
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 2582a1da66..08ea62681b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -21,34 +21,62 @@ from synapse.api.errors import StoreError
 from syutil.jsonutil import encode_canonical_json
 
 import logging
+import simplejson as json
+import types
 
 logger = logging.getLogger(__name__)
 
 
 class PusherStore(SQLBaseStore):
+    def _decode_pushers_rows(self, rows):
+        for r in rows:
+            dataJson = r['data']
+            r['data'] = None
+            try:
+                if isinstance(dataJson, types.BufferType):
+                    dataJson = str(dataJson).decode("UTF8")
+
+                r['data'] = json.loads(dataJson)
+            except Exception as e:
+                logger.warn(
+                    "Invalid JSON in data for pusher %d: %s, %s",
+                    r['id'], dataJson, e.message,
+                )
+                pass
+
+            if isinstance(r['pushkey'], types.BufferType):
+                r['pushkey'] = str(r['pushkey']).decode("UTF8")
+
+        return rows
+
     @defer.inlineCallbacks
     def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
-        sql = (
-            "SELECT * FROM pushers "
-            "WHERE app_id = ? AND pushkey = ?"
-        )
+        def r(txn):
+            sql = (
+                "SELECT * FROM pushers"
+                " WHERE app_id = ? AND pushkey = ?"
+            )
 
-        rows = yield self._execute_and_decode(
-            "get_pushers_by_app_id_and_pushkey",
-            sql,
-            app_id, pushkey
+            txn.execute(sql, (app_id, pushkey,))
+            rows = self.cursor_to_dict(txn)
+
+            return self._decode_pushers_rows(rows)
+
+        rows = yield self.runInteraction(
+            "get_pushers_by_app_id_and_pushkey", r
         )
 
         defer.returnValue(rows)
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
-        sql = (
-            "SELECT * FROM pushers"
-        )
+        def get_pushers(txn):
+            txn.execute("SELECT * FROM pushers")
+            rows = self.cursor_to_dict(txn)
 
-        rows = yield self._execute_and_decode("get_all_pushers", sql)
+            return self._decode_pushers_rows(rows)
 
+        rows = yield self.runInteraction("get_all_pushers", get_pushers)
         defer.returnValue(rows)
 
     @defer.inlineCallbacks
@@ -72,7 +100,7 @@ class PusherStore(SQLBaseStore):
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
                     lang=lang,
-                    data=encode_canonical_json(data).decode("UTF-8"),
+                    data=encode_canonical_json(data),
                 ),
                 insertion_values=dict(
                     id=next_id,