Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DbApiHook.insert_rows unnecessarily restarting connections #40609

Open
2 tasks done
potiuk opened this issue Jul 4, 2024 Discussed in #40608 · 25 comments
Open
2 tasks done

DbApiHook.insert_rows unnecessarily restarting connections #40609

potiuk opened this issue Jul 4, 2024 Discussed in #40608 · 25 comments

Comments

@potiuk
Copy link
Member

potiuk commented Jul 4, 2024

Discussed in #40608

Originally posted by plutaniano July 4, 2024

Apache Airflow Provider(s)

common-sql

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.14.0
apache-airflow-providers-mysql==5.6.1
apache-airflow-providers-postgres==5.11.1

Apache Airflow version

2.9.2

Operating System

MacOS Sonoma 14.5 (docker host)

Deployment

Docker-Compose

Deployment details

I'm using the official Airflow docker-compose.yaml + a MySQL database, details in the reproduction steps.

What happened

The database connection is restarted multiple times during a single DbApiHook.insert_rows call.

What you think should happen instead

DbApiHook.insert_rows should create and maintain a single db connection.

How to reproduce

Creating a temporary test project

mkdir /tmp/airflow/
cd /tmp/airflow/
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml'

Add the following mysql db to the docker-compose file

  mysql:
    image: mysql:latest
    environment:
      MYSQL_DATABASE: 'db'
      MYSQL_ROOT_PASSWORD: 'airflow'
    ports:
      - '3306:3306'

Run the docker compose

docker compose up -d

Add the following connections to Airflow using docker exec -it airflow-airflow-triggerer-1 bash

airflow connections add postgres_default --conn-uri postgresql://airflow:airflow@postgres
airflow connections add mysql_default --conn-uri mysql://root:airflow@mysql/db

Then open a python shell and execute the following scripts:

from airflow.providers.postgres.hooks.postgres import PostgresHook

pg = PostgresHook()
pg.run("CREATE TABLE IF NOT EXISTS t (a int)")

pg.insert_rows(
    table="t",
    rows=[[i] for i in range(10_000)],
    target_fields="a",
)

And for MySQL

from airflow.providers.mysql.hooks.mysql import MySqlHook

mysql = MySqlHook()
mysql.run("CREATE TABLE IF NOT EXISTS t (a int)")
mysql.insert_rows(
    table="t",
    rows=[[i] for i in range(100)],
    target_fields="a",
)

Both scripts will open up multiple connections to database while inserting, instead of maintaining just one. Postgres seems to recreate the connection every 1000 inserts, mysql does it after every insert.

Postgres:

>>> pg.insert_rows(
...     table="t",
...     rows=[[i] for i in range(10_000)],
...     target_fields="a",
... )
[2024-07-04T15:08:13.940+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.942+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.996+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:13.997+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.043+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.044+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.090+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.091+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.145+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.146+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.200+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.201+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.245+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.246+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.290+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.291+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.341+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.342+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.393+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.394+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.441+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.441+0000] {sql.py:611} INFO - Done loading. Loaded a total of 10000 rows into t

MySQL

>>> mysql.insert_rows(
...     table="t",
...     rows=[[i] for i in range(100)],
...     target_fields="a",
... )
[2024-07-04T15:08:54.551+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.554+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.555+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.556+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.558+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
...
[2024-07-04T15:08:54.616+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.618+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.619+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.620+0000] {sql.py:611} INFO - Done loading. Loaded a total of 100 rows into t

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@potiuk potiuk added kind:bug This is a clearly a bug area:providers needs-triage label for new issues that we didn't triage yet and removed needs-triage label for new issues that we didn't triage yet labels Jul 4, 2024
@dosubot dosubot bot added the area:core label Jul 4, 2024
@potiuk
Copy link
Member Author

potiuk commented Jul 4, 2024

@dabla -> can you please take a look. I am not sure but I think this is a side-effect of:

    @contextmanager
    def _create_autocommit_connection(self, autocommit: bool = False):
        """Context manager that closes the connection after use and detects if autocommit is supported."""
        with closing(self.get_conn()) as conn:
            if self.supports_autocommit:
                self.set_autocommit(conn, autocommit)
            yield conn

@potiuk potiuk removed the area:core label Jul 4, 2024
@potiuk
Copy link
Member Author

potiuk commented Jul 4, 2024

Actually it's a side effect of #38528

@potiuk
Copy link
Member Author

potiuk commented Jul 4, 2024

@dabla -> the root cause of the problem is now the connection is created every time "placeholder" property is accessed. Would you like to take a stab and fix it ?

@dabla
Copy link
Contributor

dabla commented Jul 5, 2024

@dabla -> the root cause of the problem is now the connection is created every time "placeholder" property is accessed. Would you like to take a stab and fix it ?

Will check it today

@dabla
Copy link
Contributor

dabla commented Jul 5, 2024

@dabla -> the root cause of the problem is now the connection is created every time "placeholder" property is accessed. Would you like to take a stab and fix it ?

I could make this a @cached_property but dunno if that will fix the issue

@dabla
Copy link
Contributor

dabla commented Jul 5, 2024

Create PR for this issue that should fix it.

@potiuk
Copy link
Member Author

potiuk commented Jul 5, 2024

Create PR for this issue that should fix it.

Yeah . I thought about and cached property is "good enough". To @plutaniano and @dabla -> this did not actuallly cause restarting of the connection that often (so it was not that bad). What it did, it retrieved Connection object from secret and read it's 'placeholderextra - which had the side effect of producingINFO' log every time placeholder property was accessed - which in case of mysql was "every single row" and for executemany compatible drivers it was every chunk.

It had the side effect of making it everything slower by a) printing logs b) accessing secrets manager / DB to read the connection extra.

Turning placeholder into cached_property will keep this extra lookup, but it's a small price to pay for the flexibility with placeholder we get this way, so I'd say it's good-enough.

@dabla
Copy link
Contributor

dabla commented Jul 5, 2024

Create PR for this issue that should fix it.

Yeah . I thought about and cached property is "good enough". To @plutaniano and @dabla -> this did not actuallly cause restarting of the connection that often (so it was not that bad). What it did, it retrieved Connection object from secret and read it's 'placeholderextra - which had the side effect of producingINFO' log every time placeholder property was accessed - which in case of mysql was "every single row" and for executemany compatible drivers it was every chunk.

It had the side effect of making it everything slower by a) printing logs b) accessing secrets manager / DB to read the connection extra.

Turning placeholder into cached_property will keep this extra lookup, but it's a small price to pay for the flexibility with placeholder we get this way, so I'd say it's good-enough.

No the messag you see has indeed nothing to do with database connections, it just retrieving each time the connection details from Airflow which allow you to create a database connection, but anyway it will be a good improvement nonetheless.

@potiuk
Copy link
Member Author

potiuk commented Jul 5, 2024

No the messag you see has indeed nothing to do with database connections, it just retrieving each time the connection details from Airflow which allow you to create a database connection, but anyway it will be a good improvement nonetheless.

Correct. No new connection. But it is much slower now because:

  • logging is expensive (performance)
  • retrieving secrets is EVEN MORE expensive (both performance and money) - we had a lot of customers complaining about high cost of secrets manager acces (in AWS) because they basically pay "per access" - and in this case the side effect of this change was that you got a secrets-manager call for EVERY INSERTED ROW - when your driver did not support execute many 😱

So yeah - caching property solves both problems, and speeds things up and makes them far less costly

@potiuk
Copy link
Member Author

potiuk commented Jul 5, 2024

Ah also logging migtht be expensive (money) as well :D depends on whether you use remote logging solution and whether it charges "per message".

@dabla
Copy link
Contributor

dabla commented Jul 5, 2024

Ah also logging migtht be expensive (money) as well :D depends on whether you use remote logging solution and whether it charges "per message".

Completely agree on that, it will cost as well in performance as in money (disk). Hopefully, in the future, AIP-59 will help us detect such regressions/side-effects ;)

@potiuk
Copy link
Member Author

potiuk commented Jul 5, 2024

Completely agree on that, it will cost as well in performance as in money (disk). Hopefully, in the future, AIP-59 will help us detect such regressions/side-effects ;)

Indeed ... Cases like that are very difficult to spot with regular unit-testing/code reviews - this one was like a side-effect going three levels deep + it's not obvious that [ self.placeholder ] * N actually calls the placeholder() method n times.

@potiuk
Copy link
Member Author

potiuk commented Jul 5, 2024

cc: @bjankie1 :D ^^

@plutaniano
Copy link

plutaniano commented Jul 5, 2024

Thanks a lot, guys. Really appreciate the attention put into this.

@plutaniano
Copy link

plutaniano commented Jul 5, 2024

For anyone who has the same problem, this should work as a temporary fix while 2.9.3 is not out. Just import these hooks instead of the ones from airflow.providers.

from functools import cached_property
from airflow.providers.mysql.hooks.mysql import MySqlHook as _MySqlHook
from airflow.providers.postgres.hooks.postgres import PostgresHook as _PostgresHook

class MySqlHook(_MySqlHook):
    @cached_property
    def placeholder(self):
        return super().placeholder


class PostgresHook(_PostgresHook):
    @cached_property
    def placeholder(self):
        return super().placeholder

@potiuk
Copy link
Member Author

potiuk commented Jul 5, 2024

For anyone who has the same problem, this should work as a temporary fix while 2.9.3 is not out. Just import these hooks instead of the ones from airflow.providers.

You can also downgrade common.sql provider to 1.11.1 which did not have placeaholder configurable (it was added in 1.12.0) or upgrade to a new common.sql provider that will be released soon (I wm thinking @eladkal ? ) maybe we should release ad-hoc common.sql because of that before 2.9.3 release ?

@eladkal
Copy link
Contributor

eladkal commented Jul 5, 2024

I plan to cut provider wave tommorow

@dabla
Copy link
Contributor

dabla commented Jul 11, 2024

@potiuk @eladkal
PR 40615 partially fixes the issue, it doesn't fix it for the JdbcHook. It seems following properties of the JdbcHook have the same issue as the placeholder property:

  • driver_class
  • driver_path
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: __get__
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: _create_autocommit_connection
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: driver_class
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: driver_path
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.
[2024-07-11, 09:38:56 UTC] {o365.py:64} INFO - Called by method: driver_path
[2024-07-11, 09:38:56 UTC] {o365.py:67} INFO - Using connection ID 'jdbc_hana' for task execution.

Maybe we should cache the connection within the Hook instance so it can be reused without having to worry which property is using it? Problem is get_connection is a classmethod, and I would not want to cache the result of the lookup into a static class variable which isn't a good idea, it would be better if it would be cached on the instance level of the Hook, but then that would mean we would need to changed the signature of the get_connection method in BaseHook.

Following methods would need to be changed from:

    @classmethod
    def get_connections(cls, conn_id: str) -> list[Connection]:
        """
        Get all connections as an iterable, given the connection id.

        :param conn_id: connection id
        :return: array of connections
        """
        warnings.warn(
            "`BaseHook.get_connections` method will be deprecated in the future."
            "Please use `BaseHook.get_connection` instead.",
            RemovedInAirflow3Warning,
            stacklevel=2,
        )
        return [cls.get_connection(conn_id)]

    @classmethod
    def get_connection(cls, conn_id: str) -> Connection:
        """
        Get connection, given connection id.

        :param conn_id: connection id
        :return: connection
        """
        from airflow.models.connection import Connection

        conn = Connection.get_connection_from_secrets(conn_id)
        log.info("Using connection ID '%s' for task execution.", conn.conn_id)
        return conn

To:

    def get_connections(self, conn_id: str) -> list[Connection]:
        """
        Get all connections as an iterable, given the connection id.

        :param conn_id: connection id
        :return: array of connections
        """
        warnings.warn(
            "`BaseHook.get_connections` method will be deprecated in the future."
            "Please use `BaseHook.get_connection` instead.",
            RemovedInAirflow3Warning,
            stacklevel=2,
        )
        return [cls.get_connection(conn_id)]

    def get_connection(self, conn_id: str) -> Connection:
        """
        Get connection, given connection id.

        :param conn_id: connection id
        :return: connection
        """
        if self._connection is None:
          from airflow.models.connection import Connection
  
          self._connection= Connection.get_connection_from_secrets(conn_id)
          log.info("Using connection ID '%s' for task execution.", self._connection.conn_id)
        return self._connection

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2024

Ah .. bummer. But we can fix it in the next round - it's very localized and it's just a lower performance of the "insert_rows" issue as we know.

@dabla
Copy link
Contributor

dabla commented Jul 11, 2024

Ah .. bummer. But we can fix it in the next round - it's very localized and it's just a lower performance of the "insert_rows" issue as we know.

What do you think of the proposed solution above? Or is this to invasive?

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2024

What do you think of the proposed solution above? Or is this to invasive?

A bit too invasive, I think. This actually changes semantics of the methods - someone could rely on the fact that they are returning a new connection object every time.

I think maybe a variation of that - add an optional and default to False reuse_connection - or similar - flag and set it to True when you access connection for just retrieving the extra value?

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2024

Or even better - add "get_connection_extra" method that will set that flag - this way anyone who wants to just retrieve the extra will use that method - then we will not have to remember to set the flag to True.

@dabla
Copy link
Contributor

dabla commented Jul 11, 2024

Or even better - add "get_connection_extra" method that will set that flag - this way anyone who wants to just retrieve the extra will use that method - then we will not have to remember to set the flag to True.

Good idea, think I saw something similar in JdbcHook already, will do that instead.

@dabla
Copy link
Contributor

dabla commented Jul 11, 2024

Something like that in DbApiBook maybe:

    @cached_property
    def connection_extra(self) -> dict:
        conn = self.get_connection(getattr(self, self.conn_name_attr))  # TODO: replace getattr with get_conn_id
        return conn.extra_dejson

Will wait until 40665 is merged, as then I can also use the get_conn_id method which is cleaner.

@dabla
Copy link
Contributor

dabla commented Jul 15, 2024

PR 40751 will even go further an cache the connection on the DbApiHook instance, as some hooks were already doing it, it is have now become a property in DbApiHook.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants