summary refs log tree commit diff
path: root/synapse/storage/appservice.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/appservice.py')
-rw-r--r--synapse/storage/appservice.py67
1 files changed, 30 insertions, 37 deletions
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 31248d5e06..6092f600ba 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -51,8 +51,7 @@ def _make_exclusive_regex(services_cache):
 class ApplicationServiceWorkerStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         self.services_cache = load_appservices(
-            hs.hostname,
-            hs.config.app_service_config_files
+            hs.hostname, hs.config.app_service_config_files
         )
         self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
 
@@ -122,8 +121,9 @@ class ApplicationServiceStore(ApplicationServiceWorkerStore):
     pass
 
 
-class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
-                                               EventsWorkerStore):
+class ApplicationServiceTransactionWorkerStore(
+    ApplicationServiceWorkerStore, EventsWorkerStore
+):
     @defer.inlineCallbacks
     def get_appservices_by_state(self, state):
         """Get a list of application services based on their state.
@@ -135,9 +135,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
             may be empty.
         """
         results = yield self._simple_select_list(
-            "application_services_state",
-            dict(state=state),
-            ["as_id"]
+            "application_services_state", dict(state=state), ["as_id"]
         )
         # NB: This assumes this class is linked with ApplicationServiceStore
         as_list = self.get_app_services()
@@ -180,9 +178,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
             A Deferred which resolves when the state was set successfully.
         """
         return self._simple_upsert(
-            "application_services_state",
-            dict(as_id=service.id),
-            dict(state=state)
+            "application_services_state", dict(as_id=service.id), dict(state=state)
         )
 
     def create_appservice_txn(self, service, events):
@@ -195,6 +191,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
         Returns:
             AppServiceTransaction: A new transaction.
         """
+
         def _create_appservice_txn(txn):
             # work out new txn id (highest txn id for this service += 1)
             # The highest id may be the last one sent (in which case it is last_txn)
@@ -204,7 +201,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
 
             txn.execute(
                 "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
-                (service.id,)
+                (service.id,),
             )
             highest_txn_id = txn.fetchone()[0]
             if highest_txn_id is None:
@@ -217,16 +214,11 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
             txn.execute(
                 "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
                 "VALUES(?,?,?)",
-                (service.id, new_txn_id, event_ids)
-            )
-            return AppServiceTransaction(
-                service=service, id=new_txn_id, events=events
+                (service.id, new_txn_id, event_ids),
             )
+            return AppServiceTransaction(service=service, id=new_txn_id, events=events)
 
-        return self.runInteraction(
-            "create_appservice_txn",
-            _create_appservice_txn,
-        )
+        return self.runInteraction("create_appservice_txn", _create_appservice_txn)
 
     def complete_appservice_txn(self, txn_id, service):
         """Completes an application service transaction.
@@ -252,26 +244,26 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
                     "appservice: Completing a transaction which has an ID > 1 from "
                     "the last ID sent to this AS. We've either dropped events or "
                     "sent it to the AS out of order. FIX ME. last_txn=%s "
-                    "completing_txn=%s service_id=%s", last_txn_id, txn_id,
-                    service.id
+                    "completing_txn=%s service_id=%s",
+                    last_txn_id,
+                    txn_id,
+                    service.id,
                 )
 
             # Set current txn_id for AS to 'txn_id'
             self._simple_upsert_txn(
-                txn, "application_services_state", dict(as_id=service.id),
-                dict(last_txn=txn_id)
+                txn,
+                "application_services_state",
+                dict(as_id=service.id),
+                dict(last_txn=txn_id),
             )
 
             # Delete txn
             self._simple_delete_txn(
-                txn, "application_services_txns",
-                dict(txn_id=txn_id, as_id=service.id)
+                txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id)
             )
 
-        return self.runInteraction(
-            "complete_appservice_txn",
-            _complete_appservice_txn,
-        )
+        return self.runInteraction("complete_appservice_txn", _complete_appservice_txn)
 
     @defer.inlineCallbacks
     def get_oldest_unsent_txn(self, service):
@@ -284,13 +276,14 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
             A Deferred which resolves to an AppServiceTransaction or
             None.
         """
+
         def _get_oldest_unsent_txn(txn):
             # Monotonically increasing txn ids, so just select the smallest
             # one in the txns table (we delete them when they are sent)
             txn.execute(
                 "SELECT * FROM application_services_txns WHERE as_id=?"
                 " ORDER BY txn_id ASC LIMIT 1",
-                (service.id,)
+                (service.id,),
             )
             rows = self.cursor_to_dict(txn)
             if not rows:
@@ -301,8 +294,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
             return entry
 
         entry = yield self.runInteraction(
-            "get_oldest_unsent_appservice_txn",
-            _get_oldest_unsent_txn,
+            "get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn
         )
 
         if not entry:
@@ -312,14 +304,14 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
 
         events = yield self._get_events(event_ids)
 
-        defer.returnValue(AppServiceTransaction(
-            service=service, id=entry["txn_id"], events=events
-        ))
+        defer.returnValue(
+            AppServiceTransaction(service=service, id=entry["txn_id"], events=events)
+        )
 
     def _get_last_txn(self, txn, service_id):
         txn.execute(
             "SELECT last_txn FROM application_services_state WHERE as_id=?",
-            (service_id,)
+            (service_id,),
         )
         last_txn_id = txn.fetchone()
         if last_txn_id is None or last_txn_id[0] is None:  # no row exists
@@ -332,6 +324,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
             txn.execute(
                 "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
             )
+
         return self.runInteraction(
             "set_appservice_last_pos", set_appservice_last_pos_txn
         )
@@ -362,7 +355,7 @@ class ApplicationServiceTransactionWorkerStore(ApplicationServiceWorkerStore,
             return upper_bound, [row[1] for row in rows]
 
         upper_bound, event_ids = yield self.runInteraction(
-            "get_new_events_for_appservice", get_new_events_for_appservice_txn,
+            "get_new_events_for_appservice", get_new_events_for_appservice_txn
         )
 
         events = yield self._get_events(event_ids)