To run tasks asynchronously use the .submit method when you call them. If you call a task as you would normally in Python code it will run synchronously, even if you are calling the task within a flow that uses the ConcurrentTaskRunner, DaskTaskRunner, or RayTaskRunner.
Many real-world data workflows benefit from true parallel, distributed task execution. For these use cases, the following Prefect-developed task runners for parallel task execution may be installed as Prefect Integrations.
RayTaskRunner runs tasks requiring parallel execution using Ray.
These task runners can spin up a local Dask cluster or Ray instance on the fly, or let you connect with a Dask or Ray environment you've set up separately. Then you can take advantage of massively parallel computing environments.
Use Dask or Ray in your flows to choose the execution environment that fits your particular needs.
To show you how they work, let's start small.
Remote storage
We recommend configuring remote file storage for task execution with DaskTaskRunner or RayTaskRunner. This ensures tasks executing in Dask or Ray have access to task result storage, particularly when accessing a Dask or Ray instance outside of your execution environment.
You may have seen this briefly in a previous tutorial, but let's look a bit more closely at how you can configure a specific task runner for a flow.
Let's start with the SequentialTaskRunner. This task runner runs all tasks synchronously and may be useful when used as a debugging tool in conjunction with async code.
Let's start with this simple flow. We import the SequentialTaskRunner, specify a task_runner on the flow, and call the tasks with .submit().
This basic flow won't benefit from parallel execution, but let's proceed so you can see just how simple it is to use the DaskTaskRunner for more complex flows.
Configure your flow to use the DaskTaskRunner:
Make sure the prefect-dask collection is installed by running pip install -U prefect-dask.
In your flow code, import DaskTaskRunner from prefect_dask.task_runners.
Assign it as the task runner when the flow is defined using the task_runner=DaskTaskRunner argument.
Use the .submit method when calling task-decorated functions.
Note that, because you're using DaskTaskRunner in a script, you must use if __name__ == "__main__": or you'll see warnings and errors.
Run dask_flow.py. If you get a warning about accepting incoming network connections, that's okay - everything is local in this example.
pythondask_flow.py
DaskTaskRunner automatically creates a local Dask cluster, then starts executing all of the task runs in parallel. The results do not return in the same order as the sequential code above.
Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from pip alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with conda. See the Ray documentation for instructions.
Now run ray_flow.pyRayTaskRunner automatically creates a local Ray instance, then immediately starts executing all of the tasks in parallel. If you have an existing Ray instance, you can provide the address as a parameter to run tasks in the instance. See Running tasks on Ray for details.
Many workflows include a variety of tasks, and not all of them benefit from parallel execution. You'll most likely want to use the Dask or Ray task runners and spin up their respective resources only for those tasks that need them.
Because task runners are specified on flows, you can assign different task runners to tasks by using subflows to organize those tasks.
This example uses the same tasks as the previous examples, but on the parent flow greetings() we use the default ConcurrentTaskRunner. Then we call a ray_greetings() subflow that uses the RayTaskRunner to execute the same tasks in a Ray instance.
If you save this as ray_subflow.py and run it, you'll see that the flow greetings runs as you'd expect for a concurrent flow, then flow ray-greetings spins up a Ray instance to run the tasks again.