flink-jpmml is a fresh-made library for dynamic real time machine learning predictions built on top of PMML standard models and Apache Flink streaming engine
Welcome! flink-jpmml
is a fresh-made library for dynamic real time machine learning predictions built on top of
PMML standard models and Apache Flink
streaming engine.
flink-jpmml
is ease to use, running at serious scale, backend independent and naturally shaped to streaming
scenario.
In order to getting started, you only need
flink-jpmml
dependencyif you employ sbt add the following dependecy to your project:
"io.radicalbit" %% "flink-jpmml-scala" % "0.7.0-SNAPSHOT"
"io.radicalbit" %% "flink-jpmml-scala" % "0.6.1"
For maven users instead:
<dependencies>
<dependency>
<groupId>io.radicalbit</groupId>
<artifactId>flink-jpmml-scala</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>io.radicalbit</groupId>
<artifactId>flink-jpmml-scala</artifactId>
<version>0.6.1</version>
</dependency>
</dependencies>
Eventually, you can publish flink-jpmml
on your local repository. Then
> sbt
> project flink-jpmml-scala
> publishLocal
Keep in mind you will need also Flink scala-core
flink-streaming
and flink-clients
libraries.
Lets start.
flink-jpmml
enables Flink users to execute real time predictions based on machine learning models trained by any
system supporting the PMML
standard; this allows efficient streaming model serving along with the powerful Flink engine features.
Since 0.6.0
the project supports dynamic streaming model serving efficiently. For more information
we suggest to watch the related talk
presented at last Flink Forward 2017 in Berlin.
First of all, we indentify univocally models by the ModelId
abstraction, made of an applicationName
and a version.
e.g. Suppose you have two ensemble models A and B (PMML based) where A has a depth level 10 on width 100 and B depth 5 on width 200, and you desire to have a comparison between them, so likely you can identify
applicationName
SVM andversions
A and B.
flink-jpmml
does not store models within its operator state, but related metadata information.
The operator is able to retrieve models from your distributed backend exploiting the concept of
metadata table. Then, your PMML models have to be persisted in a backend system
(see here for supported systems).
If you want to use dynamic model evaluation you're going to define the following streams:
DataStream[ServingMessage]
this stream is the main user tool to feed the operator with
necessary model information; here, the user is not demanded to send by stream its PMML models but only the requested
descriptive metadata. The user should employ ServingMessage
ADT in order to feed this stream.
By now, the user can define the following two messages:
AddMessage
it requires an applicationName
Java UUID formatted, a version
, the model source
path
and a timestampDelMessage
it requires an applicationName
Java UUID formatted, a version
and a timestampDataStream[BaseEvent]
your input stream should extend the BaseEvent
trait and defining the string modelId
formatted as "<modelApplication>_<modelVersion>"
and a timestamp.Given the streams above you can achieve predictions way easily.
import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.streaming.api.scala._
import io.radicalbit.flink.pmml.scala.models.control.ServingMessage
...
val inputStream: DataStream[_ <: BaseEvent] = yourInputStream
val controlStream: DataStream[ServingMessage] = yourControlStream
val predictions =
inputStream
.withSupportStream(controlStream)
.evaluate { (event, model) =>
val vector = event.toVector
val prediction = model.predict(vector)
prediction
}
The features of flink-jpmml PMML models are better discussed here: you will find several ways to
handle your predictions. All the concepts introduced along the first flink-jpmml
, i.e. how the model is built within
the operator, the operator configuration and so forth have been retained and are well described below.
We kept also the single operator model explained later if you want to bind a specific model to an operator instance.
When an event A comes, it declares by its modelId
which is the model it needs to be evaluated against.
If the model has not been uploaded within the operator yet, the latter will exploit the metadata information
to lazily retrieve the targeted model from the underlying distributed backend.
The control stream is the right tool for the user to provide the global picture of the models available to your platform (this well fits a model repository server concept). You will use this stream to feed the operator with the information useful to your input events in order to let them grab easily the models.
If the events are able to find the targeted models, the prediction is computed and a Prediction
(based on ADT) outcome is returned, otherwise
the outcome will be an EmptyPrediction
.
Supposing you have your focused InputStream
and you want to score related data
import org.apache.flink.streaming.api.scala._
case class InputEvent(data: Array)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[InputEvent] = env.yourInputStream
So you can achieve it easily with the following
// This will be all that you need
import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.streaming.api.scala._
object FlinkJpmmlExample {
def main(args: Array[String]): Unit = {
// your model can reside in any Flink supported backend
val pathToPmml = "/even/to/distributed/systems"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events = env.yourInputStream
// a lazy reader implementation
val reader = ModelReader(pathToPmml)
// lets go with predictions
events.evaluate(reader) { (event, model) =>
// FlinkML Vector abstraction is used
val toBePredicted = vectorize(event)
// and here we are
val prediction: Prediction = model.predict(toBePredicted)
val result = prediction.value
// finally custom returns
(event, result)
}
env.execute("Awesome predictions with flink-jpmml")
}
}
Some useful insights from the code:
ModelReader
is a lazy reader and it provides the right reading abstraction to TaskManagersPMMLModel
will be loaded by once factory for each TaskManager running on your architecture at
construction time
PmmlModel.predict
method expects Flink Vectors as input event and, if you want to manage NaNs, an optional
replace value;Prediction
provides the result of the input event evaluation against the PMML model as a Prediction[Double]
ADT,
so if the model can't manage a prediction it will return a EmptyPrediction
value.flink-jpmml
provides also a quick prediction function if it can run against a Stream of Flink Vectors
...
val vectorStream: DataStream[Vector] = events.map(event => vectorize(event))
val predictions: DataStream[Prediction, Vector] = vectorStream.quickEvaluate(reader)
predictions.map(_.target).print()
env.execute("flink-jpmml quick predictions")
These really basic examples show you how to interact with the library. The following sections try to address some interesting details which worth a deeper analysis.
flink-jpmml
main effort is to retain all the streaming concepts:
ModelReader
is the object implementing the previous behavior; it will provide the loading methods but will read it
lazily, i.e. only when the transformation will be appliedPMMLModel
will be loaded by a singleton model factory for each TaskManager running on your architecture; that means if you have an
active TaskManager A made up of 4 TaskSlots, your TM will load the model from a single loader entity; this is
crucial in order to let the system scale in thread-safety (still simple PMML models can grow to several hundreds of MegaBytes
proportionally to the model size, meaning a big load in memory terms)PmmlModel.predict
method expects Flink Vectors as input events; this choice let us to leverage the underlying
Breeze implementation and no reflection will be applied at all; moreover, the user don't have to specify any
key-value structure: you have data matching a feature vector, so the former will be used against the latter;
(see input discussion section for further details)flink-jpmml
can also handle sparse data, thus you can just pass the desired replace value as argument to the
discussed method (here you will need a SparseVector) val prediction = model.predict(sparseData, replaceValue)
Prediction
is the output case class: it returns the result of the input event evaluation against the PMML model as
a Score[Double]
ADT, so if the model can't manage a prediction it will return a EmptyScore
.The design worths bit more focus: the choice to have a UDF as input prediction method is justified by the need of handling a Machine Learning task (a prediction task) along with a pure Streaming application; in this way the user can manage predictions in the body of the function with the primitive event.
Assume this is the considered PMML feature vector
["sepal_width", "sepal_length", "petal_width", "petal_length"]
and you have values for all these fields; so, just create a DenseVector
val vector: DenseVector = DenseVector(value1, value2, value3, value4)
Suppose you missed value2, so you will need a SparseVector
val vector: SparseVector = SparseVector(4, Array(0, 2, 3), Array(value1, value2, value4))
flink-jpmml
will recognize missing values and it will replace them with replaceValue
if specified
(as second argument of the PmmlModel.predict
method), otherwise the NaNs handling is demanded to the PMML model.
Note also flink-jpmml
assumes that if you employ a DenseVector, it means that the dense instance size is your model
size and it will not predict anything (i.e. returns Score.Empty
); in case of sparse instances,
the library reads the sparse size value.
flink-jpmml
handles prediction issuesflink-jpmml
won't break your job if something goes wrong but the model loading step; it is such a crucial
action (it's mandatory for predictions), then in case of failure it raises a ModelLoadingException
.
Each other issue is detailed as log messages.
The handled failures are:
If you want to contribute to the project send an email to [email protected] or just open an issue
here. flink-jpmml
community is looking for you!
This library has been published under the GNU Affero General Public License (AGPL) version 3.0 following and respecting the official advices coming from the Apache Software Foundation about the compatibility between the Apache License, Version 2.0 and the GNU General Public License, Version 3.0
<SUB>THIS STANDARD IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS RELEASE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.</SUB>