ML pipeline提供一组统一的高级API,它们构建在 DataFrame以上可以帮助用户创建和调整实用的机器学习pipeline。
MLlib学习机器算法API标准化使多种算法合并成一个pipeline或者工作流变得更容易。Pipeline概念主要是受scikit-learn启发。
这个ML API使用Spark SQL 的DataFrame作为一个ML它可以容纳各种数据类型的数据集。a DataFrame不同列可以存储文本、特征向量、真实标签和预测值。
ATransformer是可以将一个DataFrame变成另一个DataFrame算法(可安装spark的transform理解)。例如,一个ML模型是一个Transformer,负责将特征DataFrame转化为包含预测值的预测值DataFrame。
An Estimator可以作用于算法的算法DataFrame产生一个Transformer。例如,学习算法是一种Estimator,负责训练DataFrame并生成模型。
Pipeline将多个Transformers和Estimators连接起来确定一个ML工作流程。
所有Transformers和Estimators现在共享一个通用API,用于指定参数。
机器学习可应用于各种数据类型,如向量、文本、图像和结构化数据。Spark Sql的dataframe支持多种数据类型。
Dataframe支持多种基本类型和结构化类型,可参考Spark查看其支持的数据类型列表。此外,除了SparkSql官方支持的数据类型,dataframe还可以支持ML向量类型。
Dataframe可以从一个规则开始RDD隐式地或显式地创建。有关创建实例请参考Spark官网,或等待浪尖后续更新。
DataFrame列式有列名。在后面的例子中会发现列表text,feature,label等
Transformer它是抽象的,包括特征转换器和学习模型。通常,转换器实现了一个transform该方法通过给予Dataframe添加一个或多个列DataFrame转化为另一个Dataframe。
一个特征转换器可以获得一个特征转换器dataframe,读一列(例如,text),然后映射成一个新的列(例如,特征向量)并输出一个新的列dataframe,该dataframe添加转换生成的列。
一个学习模型可以获得一个学习模型dataframe,读取包含特征向量的列,预测每个特征向量的标签,然后生成包含预测标签列的新列dataframe。
Estimator抽象一个学习算法或任何数据fit或者trains操作算法。从技术上讲,一个Estimator实现了一个fit()方法,接受一个方法dataframe并生成一个模型(即一个模型)Transformer)。例如,学习算法,例如:LogisticRegression是一个Estimator,通过调用fit()训练一个LogisticRegressionModel,这是一个模型,也是一个模型Transformer。
Transformer.transform()s和Estimator.fit()s都是无状态的。在未来,可以通过替代概念来支持状态算法。
每个Transformer或者Estimator只有一个ID,该ID在指定参数时有用,以后再讨论。
在机器学习中,一系列算法通常用于处理和学习数据。例如,简单的文本文档处理过程可能包括几个阶段:
-
将每个文档的文本分成单词。
-
将每个文档的单词转换为数字特征向量。
-
使用特征向量和标签学习预测模型。
MLlib把这样的工作流程变成一个pipeline,它包括按顺序执行的一些列PipelineStages (Transformers 和Estimators) 。工作流程将有详细的例子。
每个pipeline指定包含一系列stages,并且每个stage要么是一个Transformer,要么是一个Estimator。这些stage按顺序输入dataframe每一个都被引入stage它将被转换。对于Transformer stages,transform()将调用该方法进行操作Dataframe。对于Estimator stages,fit()方法会被调用,然后产生一个Transformer(其会成为PipelineModel部分,或fitted pipeline-训练好的pipeline),并且那个Transformer的transform该方法将被调用来操作那个Dataframe。
我们用简单的文本文档工作流来解释这一点。
以上一行代表一个Pipeline有三个阶段。前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。最低行代表流经管道的数据,其中圆柱表示DataFrames。Pipeline.fit()方法调用原始操作DataFrame,它包含原始文档和标签。Tokenizer.transform()将原始文本分成单词单词,并在原始文本中添加一个单词dataframe上。HashingTF.transform()将单词列转化为特征向量的方法dataframe增加特征向量列。接着,由于LogisticRegression是一个Estimator,Pipeline先调用LogisticRegression.fit(),生成一个LogisticRegressionModel。如果,Pipeline有更多的Estimators,他会调用LogisticRegressionModel’s transform()方法在Dataframe传入下个stage前去作用于Dataframe。
一个Pipeline是一个Estimator。因此,在pipeline的fit()方法运行后,它会产生一个PipelineModel,其也是一个Transformer。这PipelineModel试验时使用 ; 下图说明了这种用法。
上图中,PipelineModel和原始的Pipeline数量相同stage,但是在原始pipeline中所有的Estimators已经变为了Transformers。当PipelineModel’s transform()如果调用方法再次测试集,数据将按顺序进行fitted pipeline中传输。每个stage的transform方法更新dataset然后将更新后的传输给下一个stage。
-
一个Pipeline的stages定义为顺序数组。目前这里给出的都是线性的Pipelines,即Pipeline每个stage使用前一stage生成的数据。Pipeline只要数据流图形成向无环图(DAG),可创建非线性Pipelines。目前这张图是基于每一张stage隐含指定的输入和输出列名(通常指定为参数)。如果Pipeline形成为DAG,那么stage必须按拓扑顺序指定。
-
由于pipelines具有不同数据类型的操作能力Dataframe,编译时不得使用类型检查。Pipelines 和PipelineModels在正式运行pipeline运行前执行类型检查。使用这种类型的检查Dataframe的schema来实现,schema就是dataframe列数据类型描述。
-
一个Pipeline的stages应该是唯一的例子。相同的myHashingTF不应该在pipeline因为pipeline的stages都包含唯一的IDs。然而,不同的例子myHashingTF1 和myHashingTF2 (类型都是HashingTF)可放入同一个Pipeline,因为不同的例子会不同IDs。
MLlib 的Estimators和Transformers使用统一的API指定参数。Param是包含文档的命名参数。ParamMap是一系列(parameter, value)对。
将参数传输到参数的主要方法有两种法:
例如,lr是LogisticRegression的一个实例,你可以调用来使得最多十次迭代使用。这个API类似于spark.mllib包中使用的API 。
在ParamMap中的任何参数将覆盖以前通过setter方法指定的参数。参数属于Estimators和Transformers的特定实例。例如,如果我们有两个LogisticRegression实例lr1和lr2,然后我们可以建立一个ParamMap与两个maxIter指定的参数:。在一个pipeline中两个算法都使用了maxIter。
通常情况下,将模型或管道保存到磁盘供以后使用是值得的。模型的导入导出功能在spark1.6的时候加入了pipeline API。大多数基础transformers和基本ML models都支持。
model.write.overwrite().save("/opt/spark-logistic-regression-model")
val sameModel = PipelineModel.load("/opt/spark-logistic-regression-model")
2 import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.Row 准备数据,格式为(label, features) val training = spark.createDataFrame(Seq((1.0, Vectors.dense(0.0, 1.1, 0.1)),(0.0, Vectors.dense(2.0, 1.0, -1.0)),(0.0,Vectors.dense(2.0, 1.3, 1.0)),(1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF("label", "features") 创建一个LogisticRegression实例,该实例是一个Estimator val lr = new LogisticRegression() 使用setter函数设置参数 lr.setMaxIter(10).setRegParam(0.01) 学习一个回归模型,使用存储在lr中的参数 val model1 = lr.fit(training) 由于model1是一个模型(即Estimator生成的Transformer),我们可以查看它在fit()中使用的参数。 打印参数(名称:值)对,其中名称是此 println("Model 1 was fit using parameters: " + model1.parent.extractParamMap) //我们也可以使用ParamMap指定参数, //它支持几种指定参数的方法。 val paramMap = ParamMap(lr.maxIter -> 20).put(lr.maxIter, 30).put(lr.regParam -> 0.1, lr.threshold -> 0.55) // 修改输出列名称 val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") 还可以结合ParamMaps。 val paramMapCombined = paramMap ++ paramMap2 //现在使用paramMapCombined参数学习一个新的模型。 // paramMapCombined覆盖之前通过lr.set *方法设置的所有参数。 val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap) 准备测试数据 val test = spark.createDataFrame(Seq((1.0, Vectors.dense(-1.0, 1.5, 1.3)),(0.0, Vectors.dense(3.0, 2.0, -0.1)),(1.0,Vectors.dense(0.0, 2.2, -1.5)))).toDF("label", "features") //使用Transformer.transform()方法对测试数据进行预测。 // LogisticRegression.transform将仅使用“特征”列。 //注意model2.transform()输出一个'myProbability'列,而不是通常的 //'probability'列,因为之前我们重命名了lr.probabilityCol参数。 model2.transform(test).select("features", "label", "myProbability", "prediction").collect().foreach { case Row(features: Vector,label: Double, prob: Vector, prediction: Double) =>println(s"($features, $label) -> prob=$prob, prediction=$prediction")}
3 import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row // 准数据(id, text, label). val training = spark.createDataFrame(Seq((0L, "a b c d e spark", 1.0),(1L, "b d", 0.0),(2L, "spark f g h", 1.0),(3L, "hadoop mapreduce",0.0))).toDF("id", "text", "label") // 配置一个包含三个stage的ML pipeline: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.001) val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) // 调用fit,训练数据 val model = pipeline.fit(training) // 可以将训练好的pipeline输出到磁盘 model.write.overwrite().save("/opt/spark-logistic-regression-model") // 也可以直接将为进行训练的pipeline写到文件 pipeline.write.overwrite().save("/opt/unfit-lr-model") // 加载到出来 val sameModel = PipelineModel.load("/opt/spark-logistic-regression-model") // (id, text) 这个格式未打标签的数据进行测试 val test = spark.createDataFrame(Seq((4L, "spark i j k"),(5L, "l m n"),(6L, "spark hadoop spark"),(7L, "apache hadoop"))).toDF("id", "text") // 在测试集上进行预测 model.transform(test).select("id", "text", "probability", "prediction").collect().foreach { case Row(id: Long, text: String, prob: Vector,prediction: Double) =>println(s"($id, $text) --> prob=$prob, prediction=$prediction")}