Contents

Data Warehouse: ClickHouse With Flink

There are systems that can store values of different columns separately, but that can’t effectively process analytical queries due to their optimization for other scenarios. Examples are HBase and BigTable. You would get throughput around a hundred thousand rows per second in these systems, but not hundreds of millions of rows per second.

MergeTree 选型

ReplacingMergeTree

在 MergeTree 的基础上,添加了 “处理重复数据” 的功能,该引擎和 MergeTree 的不同之处在于它会删除具有相同主键的重复项。使用 order by 使用的字段来规定去重字段。

SummingMergeTree

在 MergeTree 的基础上,添加了 “合并重复数据” 的功能,会把具有相同主键的行合并为一行,该行包含了被合并的行中具有数值数据类型的列的汇总值。

选型

ReplacingMergeTree
  • 使用 SummingMergeTree 不能保证数据准确性,如中断重启数据会重新进来合并。
  • 解决数据合并的方式有很多。

业务:

1
2
3
4
5
6
7
result.addSink(JdbcSink.sink());
//question: Phoenix 也是 JDBC,为什么没有用 JdbcSink?
//          因为之前访问 Hbase 的表都不一样,表字段也不一样。( phoenix 可以用 自定义的 MySQLSink 或 JDBC)
//          CH 数据来源唯一

//使用工具类
result.addSink(ClickHouseUtil.getSink("insert into line(?, ?, ?)"))

工具类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ClickHouseUtil{
    public static <T> SinkFunction<T> getSink()String sql{
        return JdbcSink.<T>sink(sql,
                               new JdbcStatementBuilder<T>(){
                                   @Override
                                   public void accept(PreparedStatement preparedStatement, T t) throws SQLException{
                                       //获取所有属性信息
                                       Field[] fields = t.getClass().getDeclaredFields();
                                       for (int i = 0; i < fields.length; i++){
                                           Field field = fields[i];
                                           //反射 获取值
                                           Object value = field.get(t);
                                           //给预编译 SQL 对象赋值
                                           preparedStatement.setObject(i + 1, value);
                                       }
                                   }
                               },
                               new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
                               new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withDriverName("")
                                .withUrl("")
                                .build());
    }
}

Kafka2CH

Goods topic from Kafka2CH

1
2
3
4
5
6
7
8
//1.get exeEnv
//2.read kafka all topic, create stream
//3.uniform all stream format
//4.union all stream
//5.get tm from data and create WaterMark
//6.group by, window, reduce. (按 id 分组,10秒滚动窗口,增量聚合(累加值)和全量聚合(提取窗口信息))
//7.join dimension info
//8.wite stream to CH

Sugar 大屏展示