Low Cost Asynchronous ADF-Python Solution for Long running tasks
- Soumen Das
- Dec 19, 2024
- 2 min read
Let's assume you have come across a scenario where you have to run a long running task in python from ADF but you don't want to wait for it till completion or you prefer to perform other tasks in between and want to check for the completion of the long running python task at the end.
Now, what are the possible solutions here and how to make a low cost one.
The First solution, that comes to our mind, is simply a) To create a Custom activity for the long running task using Python, b) Embed the Custom activity inside an ADF pipeline and c) use the "Execute Pipeline" activity without setting the "Wait for completion" option; this will trigger the target pipeline and immediately move on to the next step in the current pipeline, regardless of whether the triggered pipeline finishes successfully or not. And, inside the Python script we can write to some log/table for future checks for completion using an until block.
Pros: Simple to implement
Cons: If there is already a VM provisioned for Self-hosted integration runtime, extra cost needs to be incurred for the batch account VM node(s)
The other solution, that is low-cost one if a VM already exists, is a) To expose Post API method by creating a Flask application using python and IIS in the VM, b) Call the long running task inside the Post API method using some asynchronous process like concurrent_futures' ThreadPoolExecutor as described below:
###Sample Python code
import concurrent.futures
import threading
import time
# Function that simulates a time-consuming task
def long_running_task():
print(f"Thread started: {threading.current_thread().name}")
time.sleep(3) # Simulate a time-consuming task
print(f"Thread finished: {threading.current_thread().name}")
return "Task Completed"
# Asynchronous function to run the thread using ThreadPoolExecutor
def async_thread_task():
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit the long_running_task to the executor
future = executor.submit(long_running_task)
# Main function to run the asynchronous code
def main():
async_thread_task()
if __name__ == '__main__':
main()
####
c) Call the post API from an Web Activity in ADF; this will trigger the long running task and immediately move on to the next step in the current pipeline, regardless of whether the triggered task finishes successfully or not. And, inside the Python script we can write to some log/table for future checks for completion using an until block.
Pros: No Extra Batch Node VM cost is needed
Cons: Relatively Complex to implement
Comments