Clojure的Map-Reduce怎么理解
这篇文章主要介绍“Clojure的Map-Reduce怎么理解”,在日常操作中,相信很多人在Clojure的Map-Reduce怎么理解问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Clojure的Map-Reduce怎么理解”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
10年积累的成都做网站、网站设计经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有高安免费网站建设让你可以放心的选择与我们合作。
PigPen 是什么?
一种看起来和用起来跟 clojure.core 都很像的 map-reduce 语言
可以把 map-reduce 的查询当成程序来写,而不是当成脚本来写
为单元测试和迭代部署提供强大的支持
注意:如果你对 Clojure 不是很熟悉,我们强烈推荐你试下这里,这里 或者 这里 的教程来了解一些 基础。
真的又是一种 map-reduce 语言吗?
如果你会 Clojure,你就已经会 PigPen 了
PigPen 的主要目标是要把语言带出等式的行列。PigPen 的操作符设计的和 Clojure 里尽可能的相似,没有特殊的用户自定义函数(UDFs)。只需要定义函数(匿名的或者命名的),然后你就能像在 Clojure 程序里一样使用它们。
这里有个常用的 word count 的例子:
(require '[pigpen.core :as pig]) (defn word-count [lines] (->> lines (pig/mapcat #(-> % first (clojure.string/lower-case) (clojure.string/replace #"[^\w\s]" "") (clojure.string/split #"\s+"))) (pig/group-by identity) (pig/map (fn [[word occurrences]] [word (count occurrences)]))))
这段代码定义了一个函数,这个函数返回一个 PigPen 的查询表达式。这个查询接受一系列的行作为输入,返回每个单词出现的次数。你可以看到这只是一个 word count 的逻辑,并没有设计到一些外部的东西,比如数据从哪里来的,会产生哪些输出。
可以组合吗?
当然。PigPen 的查询是写成函数的组合——数据输入、输出。只需要写一次,不需要到处复制、粘贴。
现在我们利用以上定义的 word-count 函数,加上 load 和 store 命令,组成一个 PigPen 的查询:
(defn word-count-query [input output] (->> (pig/load-tsv input) (word-count) (pig/store-tsv output)))
这个函数返回查询的 PigPen 表示,他自己不会做什么,我们需要从本地执行它,或者生成一个脚本(之后会讲)。
你喜欢单元测试?我们可以做
利用 PigPen,你可以 mock 输入数据来为你的查询写单元测试。再也不需要交叉着手指想象提交到 cluster 上后会发生什么,也不需要截出部分文件来测试输入输出。
Mock 数据真的很容易,通过 pig/return 和 pig/constantly,你可以在你的脚本里注入任意的数据作为起始点。
一个常用的模式是利用 pig/take 来从实际数据源中抽样出几行,用 pig/return 把结果包一层,就得到了 mock 数据。
(use 'clojure.test) (deftest test-word-count (let [data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])] (is (= (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]))))
pig/dump 操作符会在本地执行查询。
闭包
向你的查询传参数很麻烦,所有函数范围内的变量或者 let 的绑定在函数里都可用。
(defn reusable-fn [lower-bound data] (let [upper-bound (+ lower-bound 10)] (pig/filter (fn [x] (< lower-bound x upper-bound)) data)))
注意 lower-bound 和 upper-bound 在生成脚本的时候就有了,在 cluster 上执行函数的时候也能使用。
那么我怎么用它呢?
只要告诉 PigPen 哪里会把一个查询写成一个 Pig 脚本:
(pig/write-script "word-count.pig" (word-count-query "input.tsv" "output.tsv"))
这样你就能得到一个可以提交到 cluster 上运行的 Pig 脚本。这个脚本会用到 pigpen.jar,这是一个加入所有依赖的 uberjar,所以要保证这个 jar 也一起被提交了。还可以把你的整个 project 打包成一个 uberjar 然后提交,提交之前记得先重命名。怎么打包成 uberjar 请参照教程。
之前看到,我们可以用 pig/dump 来本地运行查询,返回 Clojure 数据:
=> (def data (pig/return [["The fox jumped over the dog."] ["The cow jumped over the moon."]])) #'pigpen-demo/data => (pig/dump (word-count data)) [["moon" 1] ["jumped" 2] ["dog" 1] ["over" 2] ["cow" 1] ["fox" 1] ["the" 4]]
如果你现在就像开始,请参照 getting started & tutorials。
为什么我需要 Map-Reduce?
Map-Reduce 对于处理单台机器搞不定的数据是很有用,有了 PigPen,你可以像在本地处理数据一样处理海量数据。Map-Reduce 通过把数据分散到可能成千上万的集群节点来达到这一目的,这些节点每个都会处理少量的数据,所有的处理都是并行的,这样完成一个任务就比单台机器快得多。像 join 和 group 这样的操作,需要多个节点数据集的协调,这种情况会通过公共的 join key 把数据分到同一个分区计算,join key 的同一个值会送到同一个指定的机器。一旦机器上得到了所有可能的值,就能做 join 的操作或者做其他有意思的事。
想看看 PigPen 怎么做 join 的话,就来看看 pig/cogroup 吧。cogroup 接受任意数量的数据集然后根据一个共同的 key 来分组。假设我们有这样的数据:
foo: {:id 1, :a "abc"} {:id 1, :a "def"} {:id 2, :a "abc"} bar: [1 42] [2 37] [2 3.14] baz: {:my_id "1", :c [1 2 3]]}
如果想要根据 id 分组,可以这样:
(pig/cogroup (foo by :id) (bar by first) (baz by #(-> % :my_id Long/valueOf)) (fn [id foos bars bazs] ...))
前三个参数是要 join 的数据集,每一个都会指定一个函数来从数据源中选出 key。最后的一个参数是一个函数,用来把分组结果结合起来。在我们的例子中,这个函数会被调用两次:
[1 ({:id 1, :a "abc"}, {:id 1, :a "def"}) ([1 42]) ({:my_id "1", :c [1 2 3]]})] [2 ({:id 2, :a "abc"}) ([2 37] [2 3.14]) ()]
这把所有 id 为 1 的值和 id 为 2 的值结合在了一起。不同的键值被独立的分配到不同的机器。默认情况下,key 可以不在数据源中出现,但是有选项可以指定必须出现。
Hadoop 提供了底层的接口做 map-reduce job,但即便如此还是有限制的,即一次只会运行一轮 map-reduce,没有数据流和复杂查询的概念。Pig 在 Hadoop 上抽象出一层,但到目前为止,它仍旧只是一门脚本语言,你还是需要用 UDF 来对数据做一些有意思的事情。PigPen 更进一步的做了抽象,把 map-reduce 做成了一门语言。
如果你刚接触 map-reduce,我们推荐你看下这里。
做 PigPen 的动机
**代码重用。**我们希望能定义一段逻辑,然后通过穿参数把它用到不同的 job 里。
**代码一体化。**我们不想在脚本和不同语言写的 UDF。 之间换来换去,不想考虑不同数据类型在不同语言中的对应关系。
**组织好代码。**我们想把代码写在多个文件里,想怎么组织怎么组织,不要被约束在文件所属的 job 里。
**单元测试。**我们想让我们的抽样数据关联上我们的单元测试,我们想让我们的单元测试在不存取数据的情况下测试业务逻辑。
**快速迭代。**我们想能够在任何时候注入 mock data,我们想在不用等 JVM 启动的情况下测试一个查询。
**只给想要命名的东西命名。**大部分 map-reduce 语言对中间结果要求命名和指定数据结构,这使得用 mock data 来测试单独的 job 变得困难。我们想要在我们觉得合适的地方组织业务逻辑并命名,而不是受语言的指使。
我们受够了写脚本,我们想要写程序。
注意:PigPen 不是一个 Clojure 对 Pig 脚本的封装,很有可能产生的脚本是人看不懂的。
设计和功能
PigPen 设计的和 Clojure 尽可能保持一致。Map-Reduce 是函数式编程,那为什么不利用一门已存在的强大的函数式编程语言呢?这样不光学习曲线低,而且大多数概念也能更容易的应用到大数据上。
在 PigPen 中,查询被当做 expression tree 处理,每个操作符都被表示需要的行为信息的 map,这些 map 可以嵌套在一起组成一个复杂查询的树形表式。每个命令包含了指向祖命令的引用。在执行的时候,查询树会被转化成一个有向无环的查询图。这可以很容易的合并重复的命令,优化相关命令的顺序,并且可以利用 debug 信息调试查询。
优化
去重当我们把查询表示成操作图的时候,去重是一件很麻烦的事。Clojure 提供了值相等的操作,即如果连个对象的内容相同,它们就相等。如果两个操作有相同的表示,那它们完全相同,所以在写查询的时候不用担心重复的命令,它们在执行之前都会被优化。
举个例子,假设我们有这样两个查询:
(let [even-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter even?) (pig/store-clj "even-squares.clj")) odd-squares (->> (pig/load-clj "input.clj") (pig/map (fn [x] (* x x))) (pig/filter odd?) (pig/store-clj "odd-squares.clj"))] (pig/script even-squares odd-squares))
在这个查询中,我们从一个文件加载数据,计算每个数的平方,然后分成偶数和奇数,操作图看起来是这样: 在此输入图片描述
这符合我们的查询,但是做了很多额外的工作。我们加载了 input.clj
两次,所有数的平方也都计算了两次。这看上去可能没有很多工作,但是当你对很多数据做这样的事情,简单的操作累加起来就很多。为了优化这个查询,我们可以找出相同的操作。看第一眼发现我们计算平方的操作可能是一个候选,但是他们有不同的父节点,因此不能把他们合并在一起。但是我们可以把加载函数合并,因为他们没有父节点,而且他们加载相同的文件。
现在我们的图看起来是这样:
现在我们值加载一次数据,这会省一些时间,但还是要计算两次平方。因为我们现在只有一个加载的命令,我们的 map 操作现在相同,可以合并:
这样我们就得到了一个优化过的查询,每个操作都是唯一的。因为我们每次只会合并一个命令,我们不会修改查询的逻辑。你可以很容易的生成查询,而不用担心重复的执行,PigPen 对重复的部分只会执行一次。
序列化当我们用 Clojure 处理完数据以后,数据必须序列化成二进制字节,Pig 才能在集群的机器间传数据。这对 PigPen 是一个很昂贵但是必须的过程。幸运的是一个脚本中经常有很多连续的操作可以合成一个操作,这对于不必要的序列化和反序列化节省了很多时间。例如,任意连续的 map,filter 和 mapcat 操作都可以被重写成一个单独的 mapcat 操作。
我们通过一些例子来说明:
在这个例子中,我们从一个序列化的值(蓝色)4开始,对它反序列化(橙色),执行我们的 map 函数,然后再把它序列化。
现在我们来试一个稍微复杂一点的(更现实的)例子。在这个例子中,我们执行一个 map,一个 mapcat 和一个 filter 函数。
如果你以前没用过 mapcat,我可以告诉你这是对一个值运行一个函数然后返回一串值的操作。那个序列会被 flatten,每个值都会传给下一步使用。在 Clojure 里,那是 map 和 concat 联合之后的结果,在 Scala 里,这叫做 flatMap,而在 C# 里叫 selectMany。
在下图中,左边的流程是我们优化之前的查询,右边的是优化之后的。和第一个例子一样,我们同样从 4 开始,计算平方,然后对这个值做减一的操作,返回本身和加一的操作。Pig 会得到这个值的集合然后做 flatten,使每个值都成为下一步的输入。注意在和 Pig 交互的时候我们要序列化和反序列化。第三步,也就是最后一步对数据进行过滤,在这个例子中我们只保留奇数值。如图所示,我们在任意两步之间都序列化和反序列化数据。
右边的图显示了优化后的结果。每个操作都返回了一个元素序列。map 操作返回一个只有单元素 16 的序列,mapcat 也一样,过滤操作返回 0 元素或单元素的序列。通过是这些命令保持一致,我们可以很容易的把他们合并到一起。我们在一套命令中flattrn 了更多的值序列,但是在步骤之间没有序列化的消耗。虽然卡起来更复杂,但是这个优化是每个步骤都执行的更快了。
测试,本地执行,以及调试
交互式开发,测试,以及可调试性是 PigPen 的关键功能。如果你有一个一次运行好几天的 job,那你最不想看到的是跑了十一个小时后冒出来一个 bug。PigPen 有个基于 rx 的本地运行模式。这可以让我们对查询写单元测试。这样我们可以更有把握的知道运行的时候不会挂掉,并且能返回期待的值。更牛逼的是这个功能可以让我们进行交互式的开发。
通常情况下,我们刚开始会从数据源中选一些记录来做单元测试。因为 PigPen 在 REPL 中返回数据,我们不需要额外构造测试数据。这样,通过 REPL,我们可以根据需要对 mock 数据做 map,filter,join 和 reduce 的操作。每个步骤都可以验证结果是不是我们想要的。这种方法相对于写一长串脚本然后凭空想象能产生更可靠的数据。还有一个有用的地方是可以把复杂的查询写成几个较小的函数单元。Map-reduce 查询随着数据源的量级可能产生剧烈的增加或减少。当你把脚本作为一个整体测试的时候,你可能要读一大堆数据,最后产生一小撮数据。通过把查询细化成较小的单元,你可以对读 100 行,产生 2 行这样子来测试一个单元,然后测试第二个单元的时候可以用这两行作为模板来产生 100 多个数据。
调试模式对于解决异常很有用,启用后会在正常输出的同时,把脚本中每个操作的结果写到磁盘上。这对于像 Hadoop 这样的环境很有用,在这种情况下,你没法单步跟踪代码,而且每个步骤都可能花好几个小时。调试模式还可以可视化流程图。这样可以可视化的把执行计划的和实际操作的输出关联起来。
要启用调试模式,请参考 pig/write-script 和 pig/generate-script 的选项,这会在指定的目录下写额外的调试输出。
启用调试模式的例子:
(pig/write-script {:debug "/debug-output/"} "my-script.pig" my-pigpen-query)
要启用可视化模式,可以看看 pig/show 和 pig/dump&show。
可视化的例子:
(pig/show my-pigpen-query) ;; Shows a graph of the query (pig/dump&show my-pigpen-query) ;; Shows a graph and runs it locally
扩展 PigPen
PigPen 有个好用的功能是可以很容易的创建自己的操作符。例如,我们可以定义像求差集和交集这样的集合和多集合的操作符,这些只是像 co-group
这样的操作符的变体,但是如果能定义,测试它们,然后再也不去想这些逻辑怎么实现的,那就更好了。
这对更复杂的操作也是很有用的。对于集合数据我们有 sum
,avg
,min
,max
,sd
和 quantiles
这些可重用的统计操作符,还有 pivot
这样的操作符可以把多维数据分组然后对每组计数。
这些操作本身都是简单的操作,但是当你把它们从你的查询中抽象出来之后,你的查询也会变的简单很多。这时候你可以花更多的时间去想怎么解决问题,而不是每次都重复写基本的统计方法。
为什么用 Pig?
我们选择 Pig 是因为我们不想把 Pig 已有的优化的逻辑重写一遍,不考虑语言层面的东西的话,Pig 在移动大数据方面做得很好。我们的策略是利用 Pig 的 DataByteArray 二进制格式来移动序列化的 Clojure 数据。在大多数情况下,Pig 不需要知道数据的底层展现形式。Byte array 可以很快的做比较,这样对于 join 和 group 操作,Pig 只需要简单的比较序列化的二进制,如果序列化的输出一致,在 Clojure 中值就相等。不过这对于数据排序不适用。二进制的排序其实没什么用,而且和原始数据的排序结果也不一样。要想排序,还得把数据转化回去,而且只能对简单类型排序。这也是 Pig 强加给 PigPen 的为数不多的一个缺陷。
我们在决定做 PigPen 之前也评估过其他语言。第一个要求就是那必须是一门编程语言,并不是一种脚本语言加上一堆 UDF。我们简单看过 Scalding,它看上去很有前途,但是我们的团队主要是用的 Clojure。 可以这么说,PigPen 对于 Clojure 就像是 Scalding 对于 Scala。Cascalog 是用 Clojure 写 map-reduce 通常会用的语言,但是从过去的经验来看,Cascalog 对于日常工作其实没什么用,你需要学一套复杂的新语法和很多概念,通过变量名对齐来做隐式 join 也不是理想的方案,如果把操作符顺序弄错了会造成很大的性能问题,Cascalog 会 flatten 数据结果(这可能很浪费),而且组合查询让人感觉很别扭。
我们也考虑过对 PigPen 用一门宿主语言。这样也能在 Hive 之上构建类似的抽象,但是对每个中间产物都定义 schema 跟 Clojure 的理念不符。而且 Hive 类似与 SQL,使得从功能性语言翻译更难。像 SQL 和 Hive 这样的关系模型语言与像 Clojure 和 Pig 这样的功能性语言之间有着巨大的差。最后,最直接的解决办法就是在 Pig 之上做一层抽象。
到此,关于“Clojure的Map-Reduce怎么理解”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
分享名称:Clojure的Map-Reduce怎么理解
文章出自:http://pcwzsj.com/article/ipcgpg.html