使用spark遇到的国内最大的spark 论坛困难,如何解决的

Spark实战高手之路-从零开始_图文_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
Spark实战高手之路-从零开始
阅读已结束,下载文档到电脑
想免费下载更多文档?
定制HR最喜欢的简历
下载文档到电脑,方便使用
还剩399页未读,继续阅读
定制HR最喜欢的简历
你可能喜欢966,690 七月 独立访问用户
语言 & 开发
架构 & 设计
文化 & 方法
您目前处于:
Spark Streaming中流式计算的困境与解决之道
Spark Streaming中流式计算的困境与解决之道
0&他的粉丝
日. 估计阅读时间:
智能化运维、Serverless、DevOps......2017年有哪些最新运维技术趋势?!
相关厂商内容
相关赞助商
CNUTCon全球运维技术大会,9月10日-9月11日,上海&光大会展中心大酒店,
在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:
Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;
Processing time& :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。
上图中 time1,time2, time3等是我们Spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在time3的batch中包含event time 为2的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的。
在实时处理过程中也就产生了两个问题:
Spark streaming 从Kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据
同一个时间的数据可能出现在多个 batch 中
针对第一个问题, 一个 batch 中包含多个时间区间的数据, 加入我们的区间粒度是5分钟, 那么一个batch钟有可能包含 0~5 时间区间中的部分数据,& 也有可能包含 5~10 时间区间中的部分数据, 这个很好处理,我们先对时间进行向下5分钟取整,然后使用取整后的时间分为多组, 然后计算出来指标,` select time, count(*) group by& 取整(time),就算出来了这个batch中每个时间区间中的数据。
但是对于第二个问题,就很麻烦, 图中举例, 时间区间中 2 出现在了 time2 和time3,& 我们需要在两个batch中计算出2 的指标, 然后进行累计,& 这个累计的过程, 你可以在内存中保存状态, 使用Spark streaming 中的 UpdateStateByKey等算子, 但是不推荐这样使用, 这样就在你的应用中引入了状态和Checkpoint机制, 还有一个方法, 就是把这个状态放在持久化存储中, 比如每次都在 Redis, 或者Hbase 中进行累计,Spark 从 Kafka 拉取日志是可以做到 至少消费一次,但是这种模式 很难保证 exact once 。
假如有下面一种情形,
(点击放大图像)
就会存在这种情况, 我们对 job1 执行 Checkpoint 操作, 然后 job1 被调度执行, 从Kafka 拉取数据处理, 然后结果保存在HBase 中, 保存了一半, 机器挂了, 如果重启,recover, 这时候 job1 就会被重复执行, Kafka 中的数据就会被重复消费, HBase中的部分指标也就多加了一份,虽然我们可以使用 Spark 或者 Flink 中提供的 Watermark 功能。
(点击放大图像)
也就是维护一个窗口, 然后设置一个最大等待时间, T1 ~T4 中的数据到了最大等待时间后就会触发计算,但是这样也会有问题, 如果部分数据的延迟超过了最大等待时间,& 这部分数据也就永远的丢失了。
当然如果业务可以容忍, 那么使用这个功能也是可以的,每次都使用 全量覆盖操作。
以上我们面临的问题是 Spark streaming + Kafka 组合可以保证at lease once ,但是很难保证 exact once, 也就是会重复消费, 我们得想办法做到去重, 计算结果 落地存储会有两种模式:
append 增量的模式, 也就是每次都做累加
complete的模式, 也即是保证幂等性, 每次都是覆盖, 保证没有副作用
因为同一个时间的数据可能出现在多个 batch 中,所以我们在准实时计算中, 只能是append 模式, 上文我们已经论证过了,这种模式会出现重复消费的问题。
由于机器挂了的现象是偶发的, 所以我们可以在挂掉后,& 对数据进行离线修复, 也就是我们要保证有一份全量的离线数据。
这份数据我们要保证是不漏不多, 而且是按照event time 时间区间分开的, 这样我们就可以针对出问题的时间区间, 加载这个时间区间的离线数据, 算出结果, 然后进行覆盖。这样就保证了数据的准确性。
我们落地的数据的特点是:
全量的,不漏不多
按照定义的时间区间分片
因为从Kafka 中拉取存储能保证不丢, 这里我们考虑如何去重, 首先我们要对消息能有一个唯一 ID, 我们使用Kafka的partition加offset作为这个消息的唯一ID, 如果存储到HBase,& 这样的话在生成一个消息的时候,我们的ID就不会重复,即使你重跑很多次,HBase会自动把它去重。
如果存储到 hdfs,& 我们可以每行数据前面都用 ID 作为头字段, 离线处理的时候根据这个字段先进行去重处理,这样也能保证了 exact once 语义。
我们看下 Spark streaming 存储到HDFS或者HBase 都会调用 saveAsHadoopDataset。
val writer = new SparkHadoopWriter(hadoopConf)
writer.open()
Utils.tryWithSafeFinallyAndFailureCallbacks {
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}(finallyBlock = writer.close())
这里根据你传入的 OutFormat 调用 getwriter。
(点击放大图像)
然后再 writer上调用 open write close commit 方法。
这里如果是 HBase& 就是调用 HBase client 的写入方法:
用户提交put请求后,HBase客户端会将put请求添加到本地buffer中,符合一定条件就会通过AsyncProcess异步批量提交。HBase默认设置autoflush=true,表示put请求直接会提交给服务器进行处理;用户可以设置autoflush=false,这样的话put请求会首先放到本地buffer,等到本地buffer大小超过一定阈值(默认为2M,可以通过配置文件配置)之后才会提交。很显然,后者采用group commit机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。
在提交之前,HBase会在元数据表.meta.中根据rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的。如果是批量请求的话还会把这些rowkey按照HRegionLocation分组,每个分组可以对应一次RPC请求。
HBase会为每个HRegionLocation构造一个远程RPC请求MultiServerCallable,然后通过rpcCallerFactory.newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。
这里如果是 HDFS 文件写入:
首先根据 TaskAttemptID构造出来一个临时写入路径,构造一个文件流
写入临时写入路径
commit 的时候调用 commitTask 根据目标路径是否存在, 如果已经存在就删除临时文件,报错, 如果不存在就直接 rename, 把临时文件名, 改为目标文件名, 这里主要是防止多个分区写入同一个目标文件,导致的冲突。
多文件分组输出
如果有一个需求,需要把数据根据不同的key输出到不同的文件中, 上文中,我们先根据 batch 进行分组, 然后不同分组的文件输出到不同的文件,这时候就需要用到MultipleOutputFormat
TreeMap&String, RecordWriter&K, V>> recordWriters = new TreeMap&String, RecordWriter&K, V>>();
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter&K, V> rw = this.recordWriters.get(finalPath);
if (rw == null) {
rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);
this.recordWriters.put(finalPath, rw);
rw.write(actualKey, actualValue);
这里就是维护了一个TreeMap, 里面每个不同的key, 构造一个 writer,& 这个writer& 是getBaseRecordWriter -& theTextOutputFormat.getRecordWriter根据临时路径构造出一个输出流, 包装为一个& LineRecordWriter 最终的 writer就是在这个 DataOutputStream 上进行输出,
上层多文件输出根据不同的key, 从treeMap上获取到不同的文件输出流, 然后进行多文件输出。
这里会存在一个问题,& 同一个时间的数据可能出现在多个 batch 中, 就是会产生很多小文件,HDFS 对小文件支持很差,我们需要合并小文件,但是我们也可以直接在输出的时候进行 append 操作,就直接避免了产生小文件。
这里就需要改源码了。
(点击放大图像)
上面的类图可以清楚的显示类图的关系,& MultipleOutputFormat 的writer 会调用子类的 getBaseRecordWriter, 我们可以在这里改写一下,& 使用我们自己的 TextOutputFormatNew 的 getRecordWriterNew 方法, 在方法里面构造输出流的时候, 如果文件已经存在,就进行 append 操作。
val fileOut: FSDataOutputStream = if (HDFSFileService.existsPath(file)) {
println(&appendfile&)
fs.append(file)
println(&createfile&)
fs.create(file, progress)
def getTaskOutputPath(job: JobConf, iname: String): Path = {
val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)
val completePath = name + &/& + iname
val path = new Path(completePath)
把构造临时路径的方法也修改了, 强制不产生临时路径, 每次都往同一个文件中进行 append, 这样就达到了目的。
本文提供的解决方案, 在不修改Spark 源码本身的前提下, 进行了一些必要的扩展,& 其实本质上来讲, 就是我们假定异常状况是经常发生的, 我们就要面对它,就是要对输入流 kafka 中的原始数据进行唯一标识,保证可以去重,然后持久化。 对发生异常的时间区间, 进行数据重放,就像数据中用 redo 日志进行重放一样。
孙彪彪,目前在七牛云存储从事 Spark 相关工作,对 Spark 有深入的研究和实践,微信公众号:Spark技术分享。email: 。
Author Contacted
告诉我们您的想法
允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p
当有人回复此评论时请E-mail通知我
允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p
当有人回复此评论时请E-mail通知我
允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p
当有人回复此评论时请E-mail通知我
赞助商链接
InfoQ每周精要
订阅InfoQ每周精要,加入拥有25万多名资深开发者的庞大技术社区。
架构 & 设计
文化 & 方法
<及所有内容,版权所有 &#169;
C4Media Inc.
服务器由 提供, 我们最信赖的ISP伙伴。
北京创新网媒广告有限公司
京ICP备号-7
找回密码....
InfoQ账号使用的E-mail
关注你最喜爱的话题和作者
快速浏览网站内你所感兴趣话题的精选内容。
内容自由定制
选择想要阅读的主题和喜爱的作者定制自己的新闻源。
设置通知机制以获取内容更新对您而言是否重要
注意:如果要修改您的邮箱,我们将会发送确认邮件到您原来的邮箱。
使用现有的公司名称
修改公司名称为:
公司性质:
使用现有的公司性质
修改公司性质为:
使用现有的公司规模
修改公司规模为:
使用现在的国家
使用现在的省份
Subscribe to our newsletter?
Subscribe to our industry email notices?
我们发现您在使用ad blocker。
我们理解您使用ad blocker的初衷,但为了保证InfoQ能够继续以免费方式为您服务,我们需要您的支持。InfoQ绝不会在未经您许可的情况下将您的数据提供给第三方。我们仅将其用于向读者发送相关广告内容。请您将InfoQ添加至白名单,感谢您的理解与支持。· · · · · ·
· · · · · ·&#xe621; 上传我的文档
&#xe602; 下载
&#xe60c; 收藏
该文档贡献者很忙,什么也没留下。
&#xe602; 下载此文档
Spark实战高手之路(4):构建Spark集群
下载积分:1000
内容提示:Spark实战高手之路(4):构建Spark集群
文档格式:PDF|
浏览次数:1|
上传日期: 10:11:02|
文档星级:&#xe60b;&#xe60b;&#xe60b;&#xe60b;&#xe60b;
全文阅读已结束,如果下载本文需要使用
&#xe71b; 1000 积分
&#xe602;下载此文档
该用户还上传了这些文档
Spark实战高手之路(4):构建Spark集群
官方公共微信用户名:lefteva
文章数:22
访问量:2937
注册日期:
阅读量:1297
阅读量:3317
阅读量:448610
阅读量:1133473
51CTO推荐博文
&&&&上篇文章中讲完了如何配置免密码登录的问题,现在讲述下,三个节点的环境配置过程。所需要的hadoop-2.7.3.tar.gz 、 jdk-7u79-linux-x64.tar.gz 、 scala-2.11.6.tgz 、 spark-2.0.1-bin-hadoop2.7.tgz 可以获取,资源存放在百度云盘。首先需要在三个节点中分别创建spark目录master节点、worker1节点、worker2节点同时执行:下面以master节点为例,部分操作worker1与worker2不需要执行,不需要worker1,worker2执行的将给出注释,请仔细看清。spark@master:~/.ssh$&cd&..
spark@master:~$&mkdir&spark注意所创建的spark目录属于spark用户,hadoop组spark@master:~$&cd&spark/在这里要介绍一个工具winSCP,功能是能够在windows 与ubuntu 之间传递文件,之所以不推荐使用lrzsz包中的rz 进行传递是因为rz只能够传递比较小的文件,对于大的文件,使用这个将会传递失败,有趣的是可以使用命令sudo rz 进行传递,但是,当上传之后,你可以看到所上传的文件所属的用户将变成了root,这将会导致后面配置的错误。因此,必须使用winSCP进行传递。winSCP也在刚才的百度云盘中。上图显示了winSCP工具界面,根据自己的文件目录进行上传,或者可以直接拖依次对上传的包进行解压,注意:在worker1和worker2中只需上传jdk-7u79-linux-x64.tar.gz 、 scala-2.11.6.tgz ,而在master中则要全部上传四个文件spark@master:~/spark$&tar&-zxvf&hadoop-2.7.3&&&#只在master节点执行spark@master:~/spark$&tar&-zxvf&jdk-7u79-linux-x64.tar.gz&&&#三个节点都要执行spark@master:~/spark$&tar&-zxvf&scala-2.11.6.tgz&&&&#三个节点都要执行spark@master:~/spark$&tar&-zxvf&spark-2.0.1-bin-hadoop2.7.tgz&&&&#只在master节点执行解压之后,为方便调用,建立软连接spark@master:~/spark$&ln&-s&hadoop-2.7.3&hadoop&&&&#只在master节点执行
spark@master:~/spark$&ln&-s&jdk1.7.0_79/&jdk&&&&&&&#三个节点都要执行
spark@master:~/spark$&ln&-s&scala-2.11.6&scala&&&&&&#三个节点都要执行
spark@master:~/spark$&ln&-s&spark-2.0.1-bin-hadoop2.7&spark&&&&#只在master节点执行切换到root下进行环境配置spark@master:~/spark$&sudo&su
[sudo]&password&for&spark:&
root@master:/home/spark/spark#&vim&/etc/profile在文件最底部添加:export&JAVA_HOME=/home/spark/spark/jdk&&&&&&#三个节点都要添加
export&SCALA_HOME=/home/spark/spark/scala&&&&&#三个节点都要添加
export&HADOOP_HOME=/home/spark/spark/hadoop&&&#三个节点都要添加
export&SPARK_HOME=/home/spark/spark/spark&&&&&&#三个节点都要添加
export&CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export&HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export&PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SPARK_HOME/bin保存,使其生效,并退回到spark用户root@master:/home/spark/spark#&source&/etc/profile
root@master:/home/spark/spark#&exit
spark@master:~/spark$至此可以查看下java环境,scala,hadoop 环境是否已经安装成功spark@master:~/spark$&java&-version
java&version&"1.7.0_79"
Java(TM)&SE&Runtime&Environment&(build&1.7.0_79-b15)
Java&HotSpot(TM)&64-Bit&Server&VM&(build&24.79-b02,&mixed&mode)java环境已经配置成功spark@master:~/spark$&scala&-version
Scala&code&runner&version&2.11.6&--&Copyright&,&LAMP/EPFLscala已经配置成功至此上述是三个节点同时执行的。接下来,master节点上的配置spark@master:~/spark$&hadoop&version
Hadoop&2.7.3
Subversion&https://git-wip-us.apache.org/repos/asf/hadoop.git&-r&baa91f7c6bc9cb92bec1c8af91ccff
Compiled&by&root&on&T01:41Z
Compiled&with&protoc&2.5.0
From&source&with&checksum&2e4ce5f957ea4db193bce
This&command&was&run&using&/home/spark/spark/hadoop-2.7.3/share/hadoop/common/hadoop-common-2.7.3.jarhadoop环境成功接下来开始配置hadoopspark@master:~/spark$&cd&hadoop/etc/hadoop/
spark@master:~/spark/hadoop/etc/hadoop$&&vim&slaves删除里面内容,并添加一下内容:保存即可依照下述命令进行文件的更改spark@master:~/spark/hadoop/etc/hadoop$&vim&hadoop-env.sh添加或更改文件中相关的变量,本人在这个地方踩了不少坑,如果不添加,会在后面报错。添加完毕后,记得保存。spark@master:~/spark/hadoop/etc/hadoop$&vim&core-site.xml添加内容到&configuration&内容&/configuration&内容如下:&property&
&&&&&&&&&name&fs.default.name&/name&
&&&&&&&&&value&hdfs://master:9000&/value&
&&&&&&&&&description&The&name&of&the&default&file&system.&&A&URI&whose&scheme&and&authority&determine&the&FileSystem&implementation.&&The&uri's&scheme&determines&the&config&property&(fs.SCHEME.impl)&naming&the&FileSystem&implementation&class.&The&uri's&authority&is&used&to&determine&the&host,&port,&etc.&for&a&filesystem.
&&&&&&&&&/description&
&&&&&/property&
&&property&
&&&&&&&&&name&hadoop.tmp.dir&/name&
&&&&&&&&&value&/home/spark/spark/hadoop/tmp&/value&
&&&&&&&&&description&A&base&for&other&temporary&directories.&/description&
&&&&&/property&spark@master:~/spark/hadoop/etc/hadoop$&vim&hdfs-site.xml添加内容到&configuration&内容&/configuration&内容如下:&property&
&&&&&&&&&name&dfs.replication&/name&
&&&&&&&&&value&3&/value&
&&&&&&&&&description&Default&block&replication.The&actual&number&of&replications&can&be&specified&when&the&file&iscreated.The&default&is&used&if&replication&is&not&specified&in&create&time.
&&&&&&&&&/description&
&&&&&/property&spark@master:~/spark/hadoop/etc/hadoop$&vim&yarn-site.xml添加内容到&configuration&内容&/configuration&内容如下:&property&
&&&&&&&&&name&yarn.resourcemanager.hostname&/name&
&&&&&&&&&value&master&/value&
&/property&
&property&
&&&&&&&&&name&yarn.nodemanager.aux-services&/name&
&&&&&&&&&value&mapreduce_shuffle&/value&
&/property&spark@master:~/spark/hadoop/etc/hadoop$&vim&mapred-site.xml
spark@master:~/spark/hadoop/etc/hadoop$&cp&mapred-site.xml.template&mapred-site.xml添加内容到&configuration&内容&/configuration&内容如下: &然后执行cp mapred-site.xml.template mapred-site.xml&property&
&&&&name&mapreduce.framework.name&/name&
&&&&&&value&yarn&/value&
&&&&&&&&description&The&runtime&framework&for&executing&MapReduce&jobs.
&&&&&&&&&Can&be&one&of&local,&classic&or&yarn.默认是local,适合单机
&&&&&&&&/description&
&&&/property&spark@master:~/spark/hadoop/etc/hadoop$&vim&yarn-site.xml添加内容到&configuration&内容&/configuration&内容如下:&property&
&&&&&&&&&name&yarn.resourcemanager.hostname&/name&
&&&&&&&&&value&master&/value&
&/property&
&property&
&&&&&&&&&name&yarn.nodemanager.aux-services&/name&
&&&&&&&&&value&mapreduce_shuffle&/value&
&/property&spark@master:~/spark/hadoop/etc/hadoop$&vim&yarn-env.sh添加以下内容在文件中地开始export&JAVA_HOME=/home/spark/spark/jdk
export&YARN_PID_DIR=/home/spark/spark/hadoop/tmp/pid保存文件,注意有些hadoop参考可能会提前将 /home/spark/spark/hadoop/tmp 文件夹建好,但在此版本中不需要提前创建,,因为在hadoop初始化时,会自动创建,若是提前创建,则有可能会在启动hadoop集群时报错!切换到worker1节点中执行spark@worker1:~/spark$&scp&-r&spark@master:/home/spark/spark/hadoop&./hadoop注意:./hadoop,代表将master中spark用户下的/home/spark/spark/hadoop复制为hadoop,此名称要跟之前在/etc/profile中设置的hadoop环境变量名称一致。在worker1中做下测试。spark@worker1:~/spark$&hadoop&version
Hadoop&2.7.3
Subversion&https://git-wip-us.apache.org/repos/asf/hadoop.git&-r&baa91f7c6bc9cb92bec1c8af91ccff
Compiled&by&root&on&T01:41Z
Compiled&with&protoc&2.5.0
From&source&with&checksum&2e4ce5f957ea4db193bce
This&command&was&run&using&/home/spark/spark/hadoop/share/hadoop/common/hadoop-common-2.7.3.jar显示成功切换到worker2节点中spark@worker2:~/spark$&scp&-r&spark@master:/home/spark/spark/hadoop&./hadoop在worker2下做下测试。spark@worker2:~/spark$&hadoop&version
Hadoop&2.7.3
Subversion&https://git-wip-us.apache.org/repos/asf/hadoop.git&-r&baa91f7c6bc9cb92bec1c8af91ccff
Compiled&by&root&on&T01:41Z
Compiled&with&protoc&2.5.0
From&source&with&checksum&2e4ce5f957ea4db193bce
This&command&was&run&using&/home/spark/spark/hadoop/share/hadoop/common/hadoop-common-2.7.3.jar显示成功初始化hadoop集群spark@master:~/spark/hadoop/etc/hadoop$&hadoop&namenode&-format若红色方框中的status为0则代表初始化成功,若为1,则为失败启动集群spark@master:~/spark/hadoop/etc/hadoop$&$HADOOP_HOME/sbin/start-all.sh然后在浏览器中输入&& &master_ip代表master的ip地址端口号为50070至此,hadoop集群已全部安装完毕。&我们将在下一篇文章中,进一步安装spark集群。。。。本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)

我要回帖

更多关于 sparksql 最大值 的文章

 

随机推荐