There are requirements to use dependencies inside the Python API programs. For example, users may need to use third-party Python libraries in Python user-defined functions. In addition, in scenarios such as machine learning prediction, users may want to load a machine learning model inside the Python user-defined functions.
When the PyFlink job is executed locally, users could install the third-party Python libraries into the local Python environment, download the machine learning model to local, etc. However, this approach doesn’t work well when users want to submit the PyFlink jobs to remote clusters. In the following sections, we will introduce the options provided in PyFlink for these requirements.
Note Both Python DataStream API and Python Table API have provided APIs for each kind of dependency. If you are mixing use of Python DataStream API and Python Table API in a single job, you should specify the dependencies via Python DataStream API to make them work for both the Python DataStream API and Python Table API.
If third-party JARs are used, you can specify the JARs in the Python Table API as following:
or in the Python DataStream API as following:
or through the command line arguments --jarfile
when submitting the job.
Note It only supports to specify one jar file with the command
line argument --jarfile
and so you need to build a fat jar if there are multiple jar files.
You may want to use third-part Python libraries in Python user-defined functions. There are multiple ways to specify the Python libraries.
You could specify them inside the code using Python Table API as following:
or using Python DataStream API as following:
You could also specify the Python libraries using configuration
python.files
or via command line arguments -pyfs
or --pyFiles
when submitting the job.
Note The Python libraries could be local files or local directories. They will be added to the PYTHONPATH of the Python UDF worker.
It also allows to specify a requirements.txt
file which defines the third-party Python dependencies.
These Python dependencies will be installed into the working directory and added to the PYTHONPATH of
the Python UDF worker.
You could prepare the requirements.txt
manually as following:
or using pip freeze
which lists all the packages installed in the current Python environment:
The content of the requirements.txt file may look like the following:
You could manually edit it by removing unnecessary entries or adding extra entries, etc.
The requirements.txt
file could then be specified inside the code using Python Table API as following:
or using Python DataStream API as following:
Note For the dependencies which could not be accessed in
the cluster, a directory which contains the installation packages of these dependencies could be
specified using the parameter requirements_cached_dir
. It will be uploaded to the cluster to
support offline installation. You could prepare the requirements_cache_dir
as following:
Note Please make sure that the prepared packages match the platform of the cluster, and the Python version used.
You could also specify the requirements.txt
file using configuration
python.requirements
or via command line arguments
-pyreq
or --pyRequirements
when submitting the job.
Note It will install the packages specified in the
requirements.txt
file using pip, so please make sure that pip (version >= 7.1.0)
and setuptools (version >= 37.0.0) are available.
You may also want to specify archive files. The archive files could be used to specify custom Python virtual environments, data files, etc.
You could specify the archive files inside the code using Python Table API as following:
or using Python DataStream API as following:
Note The parameter target_dir
is optional. If specified,
the archive file will be extracted to a directory with the specified name of target_dir
during execution.
Otherwise, the archive file will be extracted to a directory with the same name as the archive file.
Suppose you have specified the archive file as following:
Then, you could access the content of the archive file in Python user-defined functions as following:
If you have not specified the parameter target_dir
:
You could then access the content of the archive file in Python user-defined functions as following:
Note The archive file will be extracted to the working directory of Python UDF worker and so you could access the files inside the archive file using relative path.
You could also specify the archive files using configuration
python.archives
or via command line arguments
-pyarch
or --pyArchives
when submitting the job.
Note If the archive file contains a Python virtual environment, please make sure that the Python virtual environment matches the platform that the cluster is running on.
Note Currently, only zip-format is supported, i.e. zip, jar, whl, egg, etc.
It supports to specify the path of the Python interpreter to execute Python worker.
You could specify the Python interpreter inside the code using Python Table API as following:
or using Python DataStream API as following:
It also supports to use the Python interpreter inside an archive file.
You could also specify the Python interpreter using configuration
python.executable
or via command line arguments
-pyexec
or --pyExecutable
when submitting the job.
Note If the path of the Python interpreter refers to the Python archive file, relative path should be used instead of absolute path.
Python is needed at the client side to parse the Python user-defined functions during compiling the job.
You could specify the custom Python interpreter used at the client side by activating it in the current session.
or specify it using configuration python.client.executable or environment variable PYFLINK_CLIENT_EXECUTABLE
It also supports to use Python user-defined functions in the Java Table API programs or pure SQL programs. The following code shows a simple example on how to use the Python user-defined functions in a Java Table API program:
You can refer to the SQL statement about CREATE FUNCTION for more details on how to create Python user-defined functions using SQL statements.
The Python dependencies could then be specified via the Python config options, such as python.archives, python.files, python.requirements, python.client.executable, python.executable. etc or through command line arguments when submitting the job.