0 0
Advertisements
Read Time:16 Minute, 29 Second

– Good afternoon, everyone. I'm glad to have the opportunity
of making the presentation, and thank you for being here. Today's topic is Simplify CDC pipeline with Spark streaming SQL and Delta Lake. First of all, please allow
me to introduce myself. I am a staff engineer from Alibaba Cloud
E-MapReduce, Product Team. I'm a Spark contributor
focused on SparkSQL, and I am also a HiveOnDelta contributor. My presentation is in three parts. Firstly, I'm going to take
a look at what is CDC, secondly, I will propose our CDC solution using Spark Streaming SQL and Delta Lake, and finally, I will
introduce some future work. First, let's take a look at what is CDC.

If you are a data engineer, you probably have already
come across the term CDC. CDC is short for Change Data Capture. It is an approach to data integration that is based on the
checking, capture and delivery of the change to data source interface. CDC can help to load the source table into your data warehouse or Delta Lake. Here is our CDC pipeline for database. There are lots of data
stored in the database or application source and we
want to analyze this table. It is not probably to run the query against the database directly because it will affect the
application performance. So we can use CDC to load the table to the external data
warehouse or Delta Lake, apps that we can do ETL or Ad hoc on the target table stored
in the data warehouse. There are a lot of CDC solutions, including incremental import
jobs or real time job.

Sqoop is an open source
tool that transport data between Hadoop and relational database. You can create a daily scheduled Sqoop
incremental import job to load the data to our data warehouse, but Sqoop solutions have some drawbacks. For example, it still have load
pressure on source database, which will affect the
applications performance and the best jobs
scheduled hourly and daily can't meet the need of real time analysis. Additionally, Sqoop increments solution also have some limitation
to the source table such as it need a last
modified timestamp column. And the Sqoop can handle delete rows because the subsequent
increment job only import update or insert it those newer than
those previously importing, it can capture the deleted rows. And we should do some DDL
on the HIVE table manually if the schema of the source table change.

Another solution of CDC is using the binlog of the database. The binlog is a set of sequence log files that could then record or insert,
update, delete operations. So streaming CDC pipeline using binlog, first, we can use some out source to us, like JSON, Maxwell to
sync binlog to Kafka, apps that Spark streaming
consume the topic from Kafka sequencing. Sequencing parses up binlog record and is the right to
targeted storage system. We support Insert, Update, Delete, like Kudu or data or HBase. If you want to replay binlog to HIVE, you should do more complex
merge logic by yourself. This solution also has some drawbacks, for example, HBase and
Kudu both have heavy source and we have lots of operational supports. If the data is too large, Kudu have some petitioner
variability problem and HBase can not support
high throughput analytics.

We have talked about the
above two CDC solutions, wise press mode using Sqoop and another streaming mode using binlog, but they both have drawbacks. Here, we propose our CDC solution using Spark Streaming SQL and Delta Lake. We can drive Streaming
SQL to parse the binlog and merge them into Delta Lake. Next, I will introduce
Spark Streaming SQL first. SQL is a declarative language. Almost all of the data
engineers have SQL skills especially in database and data warehouse, such as MySQL, HIVESQL, SparkSQL, etc. SQL make data analysts more
easily and effectively. User can focus on your business logic, there're lots of streaming
engines provide SQL language, like Create SQL, Stream SQL, etc. By using Stream SQL, even
if the user are not familiar with Spark Streaming or the user now learn Java or
Scala programming language, they can also easily develop
streaming processing.

Additionally, it is also low
cost if you want to migrate from best SQL job to a Streaming SQL job. In Sparks community, the ISO related JIRA, which discussed to support
Streaming SQL in Spark and our team also get
involved in this SPIP. Furthermore, we have implement
our watching of Streaming SQL and we have integrate into our product, EMI in Alibaba Cloud. The figure on the left hand
shows the Streaming SQL for racing on the Spark engine stack. It is on the top of structure streaming.

We provide some DDL and DML. For DDL, we support Create table, Create table and select,
Create scan and Create stream. For DML, we support
insert into, Merge into, and we also support some other operators, like select, where, group
by, join, union all. And the UDF is also supported. Above all, structure streaming supports lots of sync and source. And a way add more sync
at the source both, such as waste boats, some storage product in
Alibaba Cloud like log hub, a tapes' toy, etc, and
others like to do already. We instill some new
keywords in DDL and DML. Next, I will pick some of them to explain why it needs
them and how to use them. The first advice, Create scan, create scan syntax define
how to read a table.

Why should we introduce scan to define how to read a table instead of reading the create the, data source tables directly? Because table created by Spark SQL is just a definition of data source. If we support both
streaming SQL and batch SQL, spout count, determinate,
whether this table related SQL is a batch query or streaming query, so we introduced the create scan syntax, take Kafka for example, as you know that Kafka data source table can be both used in streaming
and batch processing. In batch mode, we can use Spark SQL to query these Kafka data
source table directly, but for streaming SQL, we must create a scan
to tell the spark engine that it is a stream type query. Here is the scan details index, create scan table named alias
in our data source table and tell the query type
by the keyword using, additionally, we can also
send longtime parameters of this scanning option clause, such as max-offset per trigger.

Create scan is a temporary runtime view on top of the data source table. It is not saved in our master, it will all be deleted after
the Spark session exit. Create scan can be used
to define a target table. So if we want to use batch mode, we can query on the data
source table directly, or we can create a batch
scan on top of the table. If we want to use streaming mode, we must create a stream
scan on top of the table. After create a stream scan on top of Kafka data source table, then we can use DML SQL to process the streaming data source. For example, we can select
from the data source and insert it into the
target table like data.

Spark for stream, Spark for streaming job, there are also longtime job parameters like checkpoint, location,
output mode, etc. How should we set these parameters
for this job through SQL? Here, we use create stream syntax, a stream represent our
struck streaming job, the job is the, the job
parameters together with DML SQL. Here is the detail syntax, question Kafka text, stream
job with options clause, which is used to set the
runtime stream job parameters, and the DML operations
on the Kafka test scan. Additionally, it is allowed to run multiple stream
jobs in one application, so we use both to create
multiple streams in one SQL file. Every create a stream
has own job parameters on the his own option clause, this is very effective and clearly. Merge into statement is the combination of insert, update, and delete, you can specify a condition to determine whether to update data or insert data to the target table. Here is the merge into syntax, merge into a targeted table from the source data
defined in the using clause. The source in using clause can be created stream scan or a sub query, as and we should also
provide the merge condition to determine whether to
update, delete, insert to the target table.

We can use merge into to replay binlog, currently, we use both data
and Kudu as a target table. This command can simplify the implement of CDC in streaming mode and
this is a simple example. UDF is a important part of data analysis. It can extend more ability
to process data using SQL, Spark streaming SQL also
supports Spark SQL UDF or HIVE UDF, and it is also
supports some window functions, window functions to both
streaming to do aggregation of event time window and we
use both two window type, wise tumbling window which means all the
windows in the stream job not overlapped it and otherwise, hopping window or sliding window which means windows in the
streaming job can be overlaptive.

And watermark is used to resolve the problem of late data. You can define the watermark of our query by specify the event time column. And the shares code on how
late data is expected to be. In the frame API, we can use this watermark
function to set it. In our streaming SQL, we can put a delay UDF
in the event statement. Here's our example, query data from Kafka to do average operator
over a tumbling window with two minutes watermark. Key now, I have introduced
Spark Streaming SQL, we can use SQL to implement
or struck streaming job instead of Scala, or Java API, it is more simple and more effective.

Next, API introduce Delta
Lake and our work on it. Delta Lake is a open source storage layer, fabrics described it as a lake house, which is a new data management project. It combines the advantage of
Delta Lake and data warehouse. This figure shows the key
feature of the Delta Lake. Delta Lake is built on the perquet format and provides some features to support more complex
data analysis requirements. For example, Delta Lake has
its own metadata management, which can handle, but bad at scale tables with billions of partitions
and fails at ease.

The most important is that
it's both CDC transaction, then we can unify bets and streaming, that is we can use Spark streaming to sync data to data table,
and at the same time, we can query, we can do a
batch query on the same table. So, it is easy to build a
real time data pipeline. We can also do the update, delete of mode of ratio on the data table. Besides, you can support schema
enforcement and evolution which can provide better data
quality and data management. Time travel provides snapshots of data, then we can query any earlier
worsening of the data. Turn the lead, only Spark
can write data through data, including batch mode and streaming mode, and Presto how spark can
query data from data. Our team also do loss or
improvement of Delta Lakes. Firstly, we provide SQL support for some operations of Delta Lake, like update, delete,
optimize and vacuum, etc, and provides asumistical
related DDL and DML. Additionally, we support HIVE and Presto to query data tables, which
is different from the data, from the implement of data community. For example, once we create a table, once we create our own data table, we get queried by Presto or
HIVE are in the community.

Presto and HIVE both
have their own tables. Here's our example to show
how we use Presto and HIVE to query data table. Above all, I have introduced our work on Spark Streaming SQL and Delta Lake. Next, I will introduce our CDC solution using Spark streaming
SQL and the Delta Lake. We can get some benefit
from this solution. Firstly, binlog can
eliminate the load pressure on the source database.

Secondly, Delta Lake is just a static job. It has no extra servers to operate, secondly, Spark Streaming SQL can simplify the implement of replacing binlog. There's no need to write
Java or Scala code, and last, the CDC pipeline provide low-latency data analysis. To change the data on the source table can be sync to Delta Lake
with minutes latency, as the user can query the
targeted data table immediately.

Next, I will show you how we build the CDC pipeline in detail. First of all, we should have
sync the binlog of the table to Kafka using double zoom
or other similar products. If you're listening to us,
how different binlog format, so the binlog parser are also different, then we can use Spark Streaming SQL to consume the binlog from Kafka, and parse the binlog to
Ganzel roll record data and the operation type of this record, like insert, update or delete, then we can merge this past
record data to Delta Lake. Here is the example to show how to use Spark Streaming SQL to build the pipeline step by step. Step one, we should create two tables, one source, Kafka table, and
another is target data table. Step two, we create a streaming scan on top of the Kafka table and set some parameters in options clause, like studying offsets,
max offset per trigger.

Step three is major logic
of the CDC pipeline. We create a screen to wrap
the merge into statements and the job parameters. The merge into contain a using clause which define a sub query
on the Kafka source table. For example, parsing have been locked, and it also specify the
condition to determine whether to insert, update, delete
data to the target data table. For example, when the
record from the source table has our merged record in the target table, and the operation type in update, then we should update the target
table with new record data. Step four, we can use
Streaming SQL command to launch the SQL file.

This command will launch a
young client mode streaming job. After that, we have viewed
CDC streaming pipeline and we can query the data
table in the outer link, if there are some data changes
in the source database table. These figures show the online
logic of merge into statement. As you can see, for each
batch of the streaming, we call the data's merge function to merge the parse binlog record
into the target data table. After we launched our CDC stream job, there are some issues which will affect the job long running stability. The first one is how to
handle the small files because we code that has much function, but each batch and
commonly the batch interval is about one or several minutes, then we'll get more and more small files after the job run many days, so we must take some action
to handle this small files during the job running, for example, we can increase the batch interval to reduce the coding of
the data merge function.

And compaction is a important tool to merge the data small files to large. Compaction not only change the data, it's just changed the data layout. We use both Spark SQL to
run the optimized command to do compaction. Additionally, because
data has much function, where large Spark job to do join. So we can use the adaptive execution mode to reduce the reducer tasks. This can also significantly
reduce small files. Next, I will introduce
how we do compaction on the long running stream job. There're two ways to do compaction, we can launch a scheduled
compaction best job, hourly or daily. This job is just a simple
optimized SQL command, but when scheduled compaction job run, it may cause the streaming job fail because our data transaction complete. Here's the left screen goes to the timeline of the transaction come in between the streaming batch
and compacting batch job.

First, the stream batch
job read the data table, the compacting job through
the transaction commit. After that, the streaming batch job will do his own transact commit. It will adjust the committed complete with COBOL compaction transaction commits. If the complete check failed,
the streaming job will fail. The drive figure show how data through the logic of transaction
commit complete check. There are three complete type, concurrent append exception, concurrent to delete read exception, concurrent delete, delete exception.

For example, according to the left figure, the streaming best job and compaction job, read the same source data files and the combating job will
delete the source data files and re-write to new files
but not change the data. Use our streaming best
job to the merge logic and also delete the same source data file. This will cause concurrent
delete, delete exception, because the deleted data
by streaming the best job has been rewritten to new files, this data is not really deleted. So to prevent the streaming pipeline file from transacting committed check, we also do some work on this. Why is that we fix a bug that when the streaming
batch only contained, insert the binlog, it should be always succeeded
to do transaction commit. Another is that we added
retry for the file test job if the complete target failed.

Another way to do compaction
is auto compaction. We pulled out compaction operations between the streaming batch. There is no complete because
of sequential execution. Currently, we support file comes strategy to determine whether we
should do a compaction during the stream running. For example, if we found the files count is greater than our shared code setting, we will trigger compaction. The third way to reduce small files is using adaptive execution, adaptive execution can
auto merge small partitions to just decrease the number of reducers and decrease the number of output files. Above all, I have introduced
the small files issue and provides some solutions. Next, I will introduce another issue that is a performance issue, because the implement of data mode if you are launched to join
jobs to do the merge logic, if the target data table size get larger, the join operation will take more time.

This will decrease the performance and effect the long running stability. The longtime filter can help to improve the performance of join especially the streaming
batch source data is small. Here's the (murmurs) we summit. And last, I will
introduce the future works of the CDC solution. Why schema change auto detected. We can auto detect the schema change and handle the schema
during the job running. So there's no need to stop
the streaming job to handy. Another is the performance improvement, we want to implement
the read mode feature, we can reduce the merge
cost of the read side to prevent the processing time of the batch from increasing. We also want to simplify
the user experience by sync statement. We can hide the replay binlog logic, the user just tell us the
source binlog format type and the target data table. That's all, thank you.

As found on YouTube

Free Coupon on Your Medicine

About Post Author

Happy
0 0 %
Sad
0 0 %
Excited
0 0 %
Sleepy
0 0 %
Angry
0 0 %
Surprise
0 0 %