Scikit-learn on PySpark
项目描述
Sparkit-learn
PySpark + Scikit-learn = Sparkit-learn
关于
Sparkit-learn旨在在PySpark上提供scikit-learn功能和API。该库的主要目标是创建一个与sklearn API紧密相关的API。
其驱动原则是 “本地思考,分布式执行。” 为了适应这个概念,基本数据块始终是一个数组或一个(稀疏)矩阵,并且操作在块级别执行。
要求
Python 2.7.x或3.4.x
Spark[>=1.3.0]
NumPy[>=1.9.0]
SciPy[>=0.14.0]
Scikit-learn[>=0.16]
从notebooks目录运行IPython
PYTHONPATH=${PYTHONPATH}:.. IPYTHON_OPTS="notebook" ${SPARK_HOME}/bin/pyspark --master local\[4\] --driver-memory 2G
使用以下方式运行测试
./runtests.sh
快速入门
Sparkit-learn引入了三种重要的分布式数据格式
ArrayRDD
一个类似于 numpy.array 的分布式数组
from splearn.rdd import ArrayRDD data = range(20) # PySpark RDD with 2 partitions rdd = sc.parallelize(data, 2) # each partition with 10 elements # ArrayRDD # each partition will contain blocks with 5 elements X = ArrayRDD(rdd, bsize=5) # 4 blocks, 2 in each partition
基本操作
len(X) # 20 - number of elements in the whole dataset X.blocks # 4 - number of blocks X.shape # (20,) - the shape of the whole dataset X # returns an ArrayRDD # <class 'splearn.rdd.ArrayRDD'> from PythonRDD... X.dtype # returns the type of the blocks # numpy.ndarray X.collect() # get the dataset # [array([0, 1, 2, 3, 4]), # array([5, 6, 7, 8, 9]), # array([10, 11, 12, 13, 14]), # array([15, 16, 17, 18, 19])] X[1].collect() # indexing # [array([5, 6, 7, 8, 9])] X[1] # also returns an ArrayRDD! X[1::2].collect() # slicing # [array([5, 6, 7, 8, 9]), # array([15, 16, 17, 18, 19])] X[1::2] # returns an ArrayRDD as well X.tolist() # returns the dataset as a list # [0, 1, 2, ... 17, 18, 19] X.toarray() # returns the dataset as a numpy.array # array([ 0, 1, 2, ... 17, 18, 19]) # pyspark.rdd operations will still work X.getNumPartitions() # 2 - number of partitions
SparseRDD
稀疏版本的 ArrayRDD,主要区别在于块是稀疏矩阵。这种拆分的背后原因是遵循 numpy.ndarray 和 *scipy.sparse 矩阵之间的区别。通常,SparseRDD 是由 splearn 的转换器创建的,但也可以手动实例化。
# generate a SparseRDD from a text using SparkCountVectorizer from splearn.rdd import SparseRDD from sklearn.feature_extraction.tests.test_text import ALL_FOOD_DOCS ALL_FOOD_DOCS #(u'the pizza pizza beer copyright', # u'the pizza burger beer copyright', # u'the the pizza beer beer copyright', # u'the burger beer beer copyright', # u'the coke burger coke copyright', # u'the coke burger burger', # u'the salad celeri copyright', # u'the salad salad sparkling water copyright', # u'the the celeri celeri copyright', # u'the tomato tomato salad water', # u'the tomato salad water copyright') # ArrayRDD created from the raw data X = ArrayRDD(sc.parallelize(ALL_FOOD_DOCS, 4), 2) X.collect() # [array([u'the pizza pizza beer copyright', # u'the pizza burger beer copyright'], dtype='<U31'), # array([u'the the pizza beer beer copyright', # u'the burger beer beer copyright'], dtype='<U33'), # array([u'the coke burger coke copyright', # u'the coke burger burger'], dtype='<U30'), # array([u'the salad celeri copyright', # u'the salad salad sparkling water copyright'], dtype='<U41'), # array([u'the the celeri celeri copyright', # u'the tomato tomato salad water'], dtype='<U31'), # array([u'the tomato salad water copyright'], dtype='<U32')] # Feature extraction executed from splearn.feature_extraction.text import SparkCountVectorizer vect = SparkCountVectorizer() X = vect.fit_transform(X) # and we have a SparseRDD X # <class 'splearn.rdd.SparseRDD'> from PythonRDD... # it's type is the scipy.sparse's general parent X.dtype # scipy.sparse.base.spmatrix # slicing works just like in ArrayRDDs X[2:4].collect() # [<2x11 sparse matrix of type '<type 'numpy.int64'>' # with 7 stored elements in Compressed Sparse Row format>, # <2x11 sparse matrix of type '<type 'numpy.int64'>' # with 9 stored elements in Compressed Sparse Row format>] # general mathematical operations are available X.sum(), X.mean(), X.max(), X.min() # (55, 0.45454545454545453, 2, 0) # even with axis parameters provided X.sum(axis=1) # matrix([[5], # [5], # [6], # [5], # [5], # [4], # [4], # [6], # [5], # [5], # [5]]) # It can be transformed to dense ArrayRDD X.todense() # <class 'splearn.rdd.ArrayRDD'> from PythonRDD... X.todense().collect() # [array([[1, 0, 0, 0, 1, 2, 0, 0, 1, 0, 0], # [1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0]]), # array([[2, 0, 0, 0, 1, 1, 0, 0, 2, 0, 0], # [2, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0]]), # array([[0, 1, 0, 2, 1, 0, 0, 0, 1, 0, 0], # [0, 2, 0, 1, 0, 0, 0, 0, 1, 0, 0]]), # array([[0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0], # [0, 0, 0, 0, 1, 0, 2, 1, 1, 0, 1]]), # array([[0, 0, 2, 0, 1, 0, 0, 0, 2, 0, 0], # [0, 0, 0, 0, 0, 0, 1, 0, 1, 2, 1]]), # array([[0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1]])] # One can instantiate SparseRDD manually too: sparse = sc.parallelize(np.array([sp.eye(2).tocsr()]*20), 2) sparse = SparseRDD(sparse, bsize=5) sparse # <class 'splearn.rdd.SparseRDD'> from PythonRDD... sparse.collect() # [<10x2 sparse matrix of type '<type 'numpy.float64'>' # with 10 stored elements in Compressed Sparse Row format>, # <10x2 sparse matrix of type '<type 'numpy.float64'>' # with 10 stored elements in Compressed Sparse Row format>, # <10x2 sparse matrix of type '<type 'numpy.float64'>' # with 10 stored elements in Compressed Sparse Row format>, # <10x2 sparse matrix of type '<type 'numpy.float64'>' # with 10 stored elements in Compressed Sparse Row format>]
DictRDD
基于列的数据格式,每个列有自己的类型。
from splearn.rdd import DictRDD X = range(20) y = list(range(2)) * 10 # PySpark RDD with 2 partitions X_rdd = sc.parallelize(X, 2) # each partition with 10 elements y_rdd = sc.parallelize(y, 2) # each partition with 10 elements # DictRDD # each partition will contain blocks with 5 elements Z = DictRDD((X_rdd, y_rdd), columns=('X', 'y'), bsize=5, dtype=[np.ndarray, np.ndarray]) # 4 blocks, 2/partition # if no dtype is provided, the type of the blocks will be determined # automatically # or: import numpy as np data = np.array([range(20), list(range(2))*10]).T rdd = sc.parallelize(data, 2) Z = DictRDD(rdd, columns=('X', 'y'), bsize=5, dtype=[np.ndarray, np.ndarray])
基本操作
len(Z) # 8 - number of blocks Z.columns # returns ('X', 'y') Z.dtype # returns the types in correct order # [numpy.ndarray, numpy.ndarray] Z # returns a DictRDD #<class 'splearn.rdd.DictRDD'> from PythonRDD... Z.collect() # [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])), # (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])), # (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0])), # (array([15, 16, 17, 18, 19]), array([1, 0, 1, 0, 1]))] Z[:, 'y'] # column select - returns an ArrayRDD Z[:, 'y'].collect() # [array([0, 1, 0, 1, 0]), # array([1, 0, 1, 0, 1]), # array([0, 1, 0, 1, 0]), # array([1, 0, 1, 0, 1])] Z[:-1, ['X', 'y']] # slicing - DictRDD Z[:-1, ['X', 'y']].collect() # [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])), # (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])), # (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0]))]
基本工作流程
使用描述的数据结构,基本工作流程几乎与 sklearn 的相同。
分布式文本向量化
SparkCountVectorizer
from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkCountVectorizer
from sklearn.feature_extraction.text import CountVectorizer
X = [...] # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4)) # sc is SparkContext
local = CountVectorizer()
dist = SparkCountVectorizer()
result_local = local.fit_transform(X)
result_dist = dist.fit_transform(X_rdd) # SparseRDD
SparkHashingVectorizer
from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from sklearn.feature_extraction.text import HashingVectorizer
X = [...] # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4)) # sc is SparkContext
local = HashingVectorizer()
dist = SparkHashingVectorizer()
result_local = local.fit_transform(X)
result_dist = dist.fit_transform(X_rdd) # SparseRDD
SparkTfidfTransformer
from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from splearn.feature_extraction.text import SparkTfidfTransformer
from splearn.pipeline import SparkPipeline
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.pipeline import Pipeline
X = [...] # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4)) # sc is SparkContext
local_pipeline = Pipeline((
('vect', HashingVectorizer()),
('tfidf', TfidfTransformer())
))
dist_pipeline = SparkPipeline((
('vect', SparkHashingVectorizer()),
('tfidf', SparkTfidfTransformer())
))
result_local = local_pipeline.fit_transform(X)
result_dist = dist_pipeline.fit_transform(X_rdd) # SparseRDD
分布式分类器
from splearn.rdd import DictRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from splearn.feature_extraction.text import SparkTfidfTransformer
from splearn.svm import SparkLinearSVC
from splearn.pipeline import SparkPipeline
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
X = [...] # list of texts
y = [...] # list of labels
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
columns=('X', 'y'),
dtype=[np.ndarray, np.ndarray])
local_pipeline = Pipeline((
('vect', HashingVectorizer()),
('tfidf', TfidfTransformer()),
('clf', LinearSVC())
))
dist_pipeline = SparkPipeline((
('vect', SparkHashingVectorizer()),
('tfidf', SparkTfidfTransformer()),
('clf', SparkLinearSVC())
))
local_pipeline.fit(X, y)
dist_pipeline.fit(Z, clf__classes=np.unique(y))
y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])
分布式模型选择
from splearn.rdd import DictRDD
from splearn.grid_search import SparkGridSearchCV
from splearn.naive_bayes import SparkMultinomialNB
from sklearn.grid_search import GridSearchCV
from sklearn.naive_bayes import MultinomialNB
X = [...]
y = [...]
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
columns=('X', 'y'),
dtype=[np.ndarray, np.ndarray])
parameters = {'alpha': [0.1, 1, 10]}
fit_params = {'classes': np.unique(y)}
local_estimator = MultinomialNB()
local_grid = GridSearchCV(estimator=local_estimator,
param_grid=parameters)
estimator = SparkMultinomialNB()
grid = SparkGridSearchCV(estimator=estimator,
param_grid=parameters,
fit_params=fit_params)
local_grid.fit(X, y)
grid.fit(Z)
特别感谢
scikit-learn 社区
spylearn 社区
pyspark 社区
项目详情
关闭
sparkit-learn-0.2.6.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d74be2e7600087d248831964c32737b7a9b6f1e0a702a45b6d19b1f3d397bd03 |
|
MD5 | 852214c341ad76fc16b9d4ddec300fd7 |
|
BLAKE2b-256 | ec19af25d5939ef5de3860371a4b8d09b50d3c5e1bd1329e78c6160e79fd593e |