Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform.
English| 简体中文
Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform.
Welcome everyone to join the Alink open source user group to communicate.
pyalink
always maintains Alink Python API against the latest Flink version, which is 1.13,pyalink-flink-***
support old-version Flink, which are pyalink-flink-1.12
, pyalink-flink-1.11
, pyalink-flink-1.10
and pyalink-flink-1.9
for now.1.6.2
.pip install pyalink
, pip install pyalink-flink-1.12
, pip install pyalink-flink-1.11
, pip install pyalink-flink-1.10
or pip install pyalink-flink-1.9
.pyalink
and/or pyalink-flink-***
can not be installed at the same time. Multiple versions are not allowed.
If pyalink
or pyalink-flink-***
was/were installed, please use pip uninstall pyalink
or pip uninstall pyalink-flink-***
to remove them.
If pip install
is slow of failed, refer to this article to change the pip source, or use the following download links:
If multiple version of Python exist, you may need to use a special version of pip
, like pip3
;
If Anaconda is used, the command should be run in Anaconda prompt.
After PyAlink installed, you can run download_pyalink_dep_jars
to download dependency jars for file system and Hive.
(If there is an error that could not find the command, you can run the python command python3 -c 'from pyalink.alink.download_pyalink_dep_jars import main;main()'
directly.)
After executed the command, you’ll see a prompt asking you about the dependencies and their versions to be downloaded.
The following dependencies and their versions of jars are supported:
These jars will be installed to the lib/plugins
folder of PyAlink.
Note that these command require the access for the folder.
You can also add the argument -d
when executing the command, i.e. download_pyalink_dep_jars -d
.
It will install all dependency jars.
You can start using PyAlink with Jupyter Notebook to provide a better experience.
Steps for usage:
Start Jupyter: jupyter notebook
in terminal
, and create Python 3 notebook.
Import the pyalink package: from pyalink.alink import *
.
Use this command to create a local runtime environment:
useLocalEnv(parallism, flinkHome=None, config=None)
.
Among them, the parameter parallism
indicates the degree of parallelism used for execution;flinkHome
is the full path of flink, and usually no need to set; config
is the configuration parameter accepted by Flink. After running, the following output appears, indicating that the initialization of the running environment is successful.
JVM listening on ***
Python listening on ***
source = CsvSourceBatchOp()\
.setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\
.setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")
res = source.select(["sepal_length", "sepal_width"])
df = res.collectToDataframe()
print(df)
In PyAlink, the interface provided by the algorithm component is basically the same as the Java APIs, that is, an algorithm component is created through the default construction method, then the parameters are set through setXXX
, and other components are connected through link / linkTo / linkFrom
.
Here, Jupyter Notebook’s auto-completion mechanism can be used to provide writing convenience.
For batch jobs, you can trigger execution through methods such as print / collectToDataframe / collectToDataframes
of batch components or BatchOperator.execute ()
; for streaming jobs, start the job with StreamOperator.execute ()
.
String URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
String SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
BatchOperator data = new CsvSourceBatchOp()
.setFilePath(URL)
.setSchemaStr(SCHEMA_STR);
VectorAssembler va = new VectorAssembler()
.setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
.setOutputCol("features");
KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
.setPredictionCol("prediction_result")
.setPredictionDetailCol("prediction_detail")
.setReservedCols("category")
.setMaxIter(100);
Pipeline pipeline = new Pipeline().add(va).add(kMeans);
pipeline.fit(data).transform(data).print();
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.13_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.12_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.11_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.10_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.9_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.0</version>
</dependency>
wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz
tar -xf flink-1.13.0-bin-scala_2.11.tgz && cd flink-1.13.0
./bin/start-cluster.sh
git clone https://github.com/alibaba/Alink.git
# add <scope>provided</scope> in pom.xml of alink_examples.
cd Alink && mvn -Dmaven.test.skip=true clean package shade:shade
./bin/flink run -p 1 -c com.alibaba.alink.ALSExample [path_to_Alink]/examples/target/alink_examples-1.5-SNAPSHOT.jar
# ./bin/flink run -p 1 -c com.alibaba.alink.GBDTExample [path_to_Alink]/examples/target/alink_examples-1.5-SNAPSHOT.jar
# ./bin/flink run -p 1 -c com.alibaba.alink.KMeansExample [path_to_Alink]/examples/target/alink_examples-1.5-SNAPSHOT.jar