-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Airflow Triggerer facing frequent restarts #33647
Comments
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Thanks for the detailed analysis and the cost breakdown before/after. This is super helpful. Looks like an index hint should be needed or smth like that. Very interesting one. I will mark it for 2.7.1 hoping maybe someone will have time to fix it before |
2.6.3 had a known issue with Triggerer health which was fixed in 2.7.0 #33089 |
I don't think so - this looks more like trigger query taking far too long because the DB optimiser does not choose the right plan to do the query efficiently. |
Hello @potiuk thank you for the reply. For now is there a way we can solve this, other than regularly running the |
No idea. Because I do not know the reason yet. |
Someone will have to take a look and investigate it |
Sure, thanks for help though :) |
To be honest better have a rule not to use For example in Postgres everything in In general better to get rid of non constant sized IN filters (couple statuses for tasks and dags) and replace by other methods:
@shubhransh-eb I guess you use MySQL backend? If so, I wonder which version? |
And I currently think how to optimize or get rid off magic wrapper airflow/airflow/utils/sqlalchemy.py Line 535 in 32a490e
|
Hello , |
Not sure how much help this will provide, but we are facing this issue now around every week or max to max once every two keep. Just to mitigate the issue, we have added alerts on top of our database (mysql) to get alert if CPU spikes (Because of frequency restart of triggerer) and then we are able to manually run the |
First of all I would recommend considering possibility of upgrading to a new version of MySQL, 5.7 it is almost EOL, even if Amazon would support MySQL 5.7 on Aurora there is big chance that Airflow would stop support MySQL 5.7 in versions which released after 31 Oct 2023. That mean that further improvements in triggerer would not available. In additional 8.0 should provide better query analyser/planner. Just make sure that you test migration on snapshot of DB before doing this on prod database. Anyway, I inspected data transfers between Triggerer and TriggerJob, it might help someone (maybe it was me) who want to optimise this:
Seems like 1-4 might be executed in one query with additional overhead on captured data but it might reduce time to execute on DB side, however required additional filtration on client (Airflow) side. |
Hello @Taragolis Just wanted to confirm, what you are suggesting is the query used for triggers need to be updated to solve this, correct? |
Let me explain how I see the options we have here (I have not done a detailed analysis what is wrong - those are a bit intelligent guesses). I am not sure you can do much more NOW than analysing the tables periodically until the code of Airlfow is updated (but maybe you can also attempt to PR some changes). Likely we need to load at the steps involved and optimise the way DB is used. Running analyse frequently should likely help you (and likely you can even schedule it every day for example) - but I think fundamentally someone (maybe @Taragolis or someone else) needs to optimise the way how we run queries to get rid of the effect you see in case you have huge amount of triggers happening. The root cause is - I believe - that when you add an delete a lot of data, at some point in time the built-in optimiser of MySQL gets confused about what is the fastest execution plan to get the data, and likely produces the plan that is not-at-all optimised - I think the main reason for that is that indexes are never rebuilt and they grow in size when data is often deleted and added and at some point of time the optimizer sees that the index size is so big, that it is better and faster to not use the index at all. This is why "analyze" helps, because it looks at the actual data left and allows the optimizer to find a better and more optimized way to read the data, it rebuilds the index and makes it way smaller, and then the optimizer will start using it again/ I see two ways you could approach it:
Those are a bit guesses - maybe @Taragolis who have done a bit more analysis can also confirm if my thinking is right. |
Thanks for suggestions @potiuk , For now we run |
To be honest I've had a look after I found this issue initially and I was lying in bed and check code thought browser on iPad and just forgot to write a message. That mean all findings need to be verified first, I assume that we use this approach:
The problem also that we operate with
I like a position of some postgres-vendor developer about hint, something like "Maybe we want to have a hints in vanilla postgres, but not by same way it implemented in Oracle but in our product we need implements some close related stuff to make people who migrate from OracleDB to our product". In general it comes from the fact that statistic in most cases better when especially if it comes to the COB (Cost Base Optimisation) or next-gen of COB The problem with hint that it fix "Here and Now" and it might work in particular this case, with particular this amount data, particular this indexes, particular this amount of memory, for particular this user and as soon as some of parameters changes the things could become worser or not improve if this hints not exists. This is just my personal position: "Query hint it is a solution of last resort after you try all other last resort solutions"
That is nice.
@shubhransh-eb I'm not an expert on MySQL but is any configuration exists which might potentially turn on/off auto gathering table statistic (aka ANALYZE)? Or it maybe by design you should manually run ANALYZE time to time. If compare to Postgres I know exactly that autoanalyze daemon run in background and if user turned off then high intensive workloads query become slower over time. But even with postgres autoanalyze daemon in some cases better manually run AMALYZE TABLE especially after huge delete + insert |
I am not an expert in MySQL either, but from what I found out on internet, I dont think MySQL automatically runs analyze command, that could be the reason why we have to manually run it to make this work. |
Also in this case, this issue happen when we have around 150 sensors start around same time (within a min) |
I think MySQL should run something to gathering statistics without it hardly possible to calculate costs for the queries. Manual ANALYZE in this case something like: "Forget everything you know about the table and collect new statistics".
That also could be a reason. Again not expert of MySQL and no idea how it handle multiple simultaneous connection. In Postgres it is quite expensive operation time+memory. So base recommendation it was use some connection pooler between DB and AIrflow, internally Airflow use SQLaclhemy pooler but it limited by single process, so better have something bettween since you use Managed MySQL on AWS, you might try to use RDS Proxy And last but not least also might be a reason fact that most of the deferrable operators not truly async, especially something like TaskStateTrigger, which might kept session for very long time and prevent gathering statistic from database, it should not be a problem on Postgres, but who know maybe this is a problem for MySQL. This one my assumption. |
So we had an incident in morning, where We had to run following commands to bring back the table
Our hunch is since trigger_id is set to we have updated the value in our table and will analyze it, but this could be a solution to set page_size for this table to be higher so that it can handle this. |
@shubhransh-eb Can you please add the previous version of Airflow where it worked fine? We upgraded from Airflow 2.3.4 to 2.7.2. We are also using mysql 8 (self hosted) and faced the query taking a long time even with 1 trigger running entry in one of the environments where db migration was done in place and has lot of records in other tables. The triggerer just hangs with the query executing. In the other environment db migration was done on a backup and restored database with less records and has no problem running triggerer job. We found that by trial and error to making dag_run to load lazily helped triggerer process to run fine though list trigger page in UI still hangs on trying to use it with active triggers. It loads fine without lazy loading when there is no trigger though. I am not sure if its relevant to you but the query mentioned in the issue description was the same in our case taking long time and thought to add it here. airflow/airflow/models/trigger.py Lines 99 to 110 in 4824ca7
Lazy load dag_run. We tried noload but it seems this is needed for logging trigger logs which made some db changes to reference task_instance for logging though I am not sure if that could be the issue with mysql. def bulk_fetch(cls, ids: Iterable[int], session: Session = NEW_SESSION) -> dict[int, Trigger]:
"""Fetch all the Triggers by ID and return a dict mapping ID -> Trigger instance."""
query = session.scalars(
select(cls)
.where(cls.id.in_(ids))
.options(
joinedload("task_instance").lazyload("dag_run"),
joinedload("task_instance.trigger"),
joinedload("task_instance.trigger.triggerer_job"),
)
)
return {obj.id: obj for obj in query} |
Hello @tirkarthi |
For what it's worth, I managed to fix this issue by cleaning up some old records from the From 2.8.0 onwards |
@QuintenBruynseraede we are doing somewhat the same thing, as mentioned we are deleting all tasks instances before 90 days, but will check this as well Thanks for help :) |
Hi @arunravimv you can open PR with your suggested code changes and we can review specific suggestion on the PR itself |
Hi @eladkal , We've implemented a patch in which joinedloadAirflow Code Snippet query = session.scalars(
select(cls)
.where(cls.id.in_(ids))
.options(
joinedload("task_instance"),
joinedload("task_instance.trigger"),
joinedload("task_instance.trigger.triggerer_job"),
)
) Explain Analyze -> Nested loop left join (cost=101 rows=95) (actual time=0.22..0.359 rows=3 loops=1)
-> Nested loop left join (cost=67.8 rows=95) (actual time=0.21..0.348 rows=3 loops=1)
-> Nested loop left join (cost=34.6 rows=95) (actual time=0.204..0.338 rows=3 loops=1)
-> Filter: (`trigger`.id in (969,968,984)) (cost=1.36 rows=3) (actual time=0.049..0.0565 rows=3 loops=1)
-> Index range scan on trigger using PRIMARY over (id = 968) OR (id = 969) OR (id = 984) (cost=1.36 rows=3) (actual time=0.048..0.0545 rows=3 loops=1)
-> Nested loop inner join (cost=35.9 rows=31.7) (actual time=0.0915..0.0932 rows=1 loops=3)
-> Index lookup on task_instance_1 using ti_trigger_id (trigger_id=`trigger`.id) (cost=8.97 rows=31.7) (actual time=0.0716..0.0731 rows=1 loops=3)
-> Single-row index lookup on dag_run_1 using dag_run_dag_id_run_id_key (dag_id=task_instance_1.dag_id, run_id=task_instance_1.run_id) (cost=0.251 rows=1) (actual time=0.0194..0.0195 rows=1 loops=3)
-> Single-row index lookup on trigger_1 using PRIMARY (id=task_instance_1.trigger_id) (cost=0.251 rows=1) (actual time=0.00301..0.00305 rows=1 loops=3)
-> Single-row index lookup on job_1 using PRIMARY (id=trigger_1.triggerer_id) (cost=0.251 rows=1) (actual time=0.00316..0.00321 rows=1 loops=3) selectinloadAirflow Code Snippet query = session.scalars(
select(cls)
.where(cls.id.in_(ids))
.options(
selectinload("task_instance"),
joinedload("task_instance.trigger"),
joinedload("task_instance.trigger.triggerer_job"),
)
) Explain Analyze -> Nested loop inner join (cost=5.26 rows=3) (actual time=0.0895..0.362 rows=3 loops=1)
-> Nested loop left join (cost=4.21 rows=3) (actual time=0.065..0.313 rows=3 loops=1)
-> Nested loop left join (cost=3.16 rows=3) (actual time=0.0521..0.298 rows=3 loops=1)
-> Index range scan on task_instance using ti_trigger_id over (trigger_id = 968) OR (trigger_id = 969) OR (trigger_id = 984), with index condition: (task_instance.trigger_id in (969,968,984)) (cost=2.11 rows=3) (actual time=0.0399..0.273 rows=3 loops=1)
-> Single-row index lookup on trigger_1 using PRIMARY (id=task_instance.trigger_id) (cost=0.283 rows=1) (actual time=0.00755..0.00759 rows=1 loops=3)
-> Single-row index lookup on job_1 using PRIMARY (id=trigger_1.triggerer_id) (cost=0.283 rows=1) (actual time=0.00454..0.00458 rows=1 loops=3)
-> Single-row index lookup on dag_run_1 using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id) (cost=0.283 rows=1) (actual time=0.016..0.016 rows=1 loops=3) |
That would be best |
@arunravimv We are also facing an issue where the trigger entry is inserted into the table but took 20-30 seconds to be picked up by triggerer itself causing delays between trigger creation and trigger being actually executed. The delay is not consistent though with sometimes the triggerer quickly picking up the trigger for execution. We are using MySQL with large number of task instances and active dags. I was wondering by performance improvement did this patch help with reducing the delay or any other specific performance issue? Thanks |
@tirkarthi we are able to load trigger based poll almost immediately (1-2 seconds), but I think this also depends on a lot of parameters like db configuration/load and the number of concurrent triggers and trigger processes you are running. |
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
We are using airflow version: 2.6.3
We have the metastore in aws.
At around 12:00 AM PST, we have around 150+ async sensor starting at same time. They act as our sensors to wait for upstream data. We have them waiting for around 6-12 hours daily. Now after the upgrade, after running for 4-5 days we see that triggerer get restarts automatically.
On investigation wee found that the query used by triggerer to get list of trigger is taking lot of time, causing the triggerer to kill python code and hence restart.
We are able to resolve it after doing analyze command on
task_instance
table.Query used
It takes around 5-6 mins and when run analyze command it takes less than 1 second.
The number of sensor is same before and after upgrade.
What you think should happen instead
No response
How to reproduce
Operating System
NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)"
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: