最近在spark-stream上写了一些流计算处理程序,程序架构如下
程序运行在Spark-stream上,我的目标是kafka、Redis的参数都支持在启动时指定。
用scala实现的
Redis服务器的地址是写死的,我的程序要挪个位置,要重新改代码编译。
当时倒腾了一些时间,现在写出来和大家分享,提高后来者的效率。
如上图Spark是分布式引擎,Driver中创建的Redis Pool,在Worker上又得重新创建,参考文章中是定义一个Redis连接池管理类,Redis Pool是类的静态变量,类加载时由JVM自动创建。这个和我的预期有差距。
在Driver中创建Redis管理对象,然后将该对象广播,然后在Worker上获取该广播对象,从而实现参数可变,但是Redis管理对象在每个Worker上又只实例化了一次。
Driver
Driver 指定序列化方式,Spark支持两种序列化方式,Java 和 Kryo,Kryo更高效。
资料上说Kryo方式需要注册类,但是我没有注册也能成功运行。
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: kafka_spark_redis
"
"
"
System.exit(1);
}
/* 解析参数 */
String brokers = args[0];
String topics = args[1];
String redisServer = args[2];
// 创建stream context,两秒钟的数据算一批
SparkConf sparkConf = new SparkConf().setAppName("kafka_spark_redis");
// sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");//java的序列号速度没有Kryo速度快
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
// sparkConf.set("spark.kryo.registrator", "MyRegistrator");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
JavaSparkContext sc = jssc.sparkContext();
HashSet
HashMap
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("group.id","kakou-test");
//Redis连接池管理类
RedisClient redisClient = new RedisClient(redisServer);//创建redis连接池管理类
//广播Reids连接池管理对象
final Broadcast
// 创建流处理对象
JavaPairInputDStream
jssc,
String.class, /* kafka key class */
String.class, /* kafka value class */
StringDecoder.class, /* key 解码类 */
StringDecoder.class, /* value 解码类 */
kafkaParams, /* kafka 参数,如设置kafka broker */
topicsSet /* 待消费的topic名称 */
);
// 将行分拆为单词
JavaDStream
//@Override
// kafka传来key-value对
public String call(Tuple2
// 取value值
return tuple2._2();
}
});
/* 大量省略 */
........
}
RedisClient
RedisClient 是自己实现的类,在类中重载write/read这两个序列化和反序列化函数,需要注意的是如果是Java Serializer 需要实现其它的接口。
在Driver广播时会触发调用write序列化函数。
public class RedisClient implements KryoSerializable {
public static JedisPool jedisPool;
public String host;
public RedisClient(){
Runtime.getRuntime().addShutdownHook(new CleanWorkThread());
}
public RedisClient(String host){
this.host=host;
Runtime.getRuntime().addShutdownHook(new CleanWorkThread());
jedisPool = new JedisPool(new GenericObjectPoolConfig(), host);
}
static class CleanWorkThread extends Thread{
@Override
public void run() {
System.out.println("Destroy jedis pool");
if (null != jedisPool){
jedisPool.destroy();
jedisPool = null;
}
}
}
public Jedis getResource(){
return jedisPool.getResource();
}
public void returnResource(Jedis jedis){
jedisPool.returnResource(jedis);
}
public void write(Kryo kryo, Output output) {
kryo.writeObject(output, host);
}
public void read(Kryo kryo, Input input) {
host=kryo.readObject(input, String.class);
this.jedisPool =new JedisPool(new GenericObjectPoolConfig(), host) ;
}
}
Worker
在foreachRDD中获取广播变量,由广播变量触发先调用RedisClient的无参反序列化函数,然后再调用反序列化函数,我们的做法是在反序列化函数中创建Redis Pool。
//标准输出,对车辆的车牌和黑名单进行匹配,对与匹配成功的,保存到redis上。
paircar.foreachRDD(new Function2
public Void call(JavaRDD
Date now=new Date();
rdd.foreachPartition(new VoidFunction
public void call(Iterator
String tmp1;
String tmp2;
Date now=new Date();
RedisClient redisClient=broadcastRedis.getValue();
Jedis jedis=redisClient.getResource();
......
redisClient.returnResource(jedis);
}
});
结语
Spark对分布式计算做了封装,但很多场景下还是要了解它的工作机制,很多问题和性能优化都和Spark的工作机制紧密相关。
茶杯头甜蜜终章dlc 官方手机版v1.0.0.3
下载火柴人传说暗影格斗内置菜单 最新版v3.0.1
下载荒野乱斗测试服 安卓版v61.10.3
下载荒野乱斗彩虹服 安卓版v61.10.3
下载寒霜启示录 安卓版v1.25.10
寒霜启示录是一款生存模拟游戏,不少玩家可能对于末日都有着自己
末日城堡免广告版 安卓最新版v0.7.1
末日城堡免广告版是一款非常好玩的模拟经营类游戏,内部可以不看
甜蜜人生模拟器 最新版v1.4.5
甜蜜人生模拟器是一款非常好玩的模拟恋爱手游,玩家在这里能够对
武器锻造师内置功能菜单 v10.4
武器锻造师内置菜单版是游戏的破解版本,在该版本中为玩家提供了
开放空间overfield 安卓版v1.0.5
开放空间Overfield是一款箱庭养成经营手游,让你在广阔