Monday 20 April 2020

Flume Demo: Sink as HDFS

Objective:

In this blog post, I will explain how to run a flume agent with sink as HDFS.

Step 1: 

Create a  agent conf file "example-hdfs.conf" as below with agent name as "a1" and sink as "/user/manoj/flume/". Make sure the directory "/user/manoj/flume/" is available in HDFS. The channel is memory.

The conf file "example-hdfs.conf" looks as below. The channel type and HDFS location is highlighted below:

-------
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = hdfs

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Customizing sink for HDFS

a1.sinks.k1.hdfs.path = /user/manoj/flume
a1.sinks.k1.filePrefix = netcat


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
---------

STEP 2:

Run the flume agent as below:
-----
flume-ng agent --name a1 --conf /home/manoj/flume/conf --conf-file /home/manoj/flume/conf/example-hdfs.conf
------

Step 3:

Open a telnet connection on port 4444 from another terminal and type something as below:


Step 4:

We could see that this getting written to the HDFS location "/user/manoj/flume". As per the agent configuration file the channel will be memory and sink will be HDFS location.



NOTE:

Inorder to have data stream in plain text format, we need modify the agent conf as below. The highlighted parameter need to be added extra than before.


------------
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = hdfs

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Customizing sink for HDFS

a1.sinks.k1.hdfs.path = /user/manoj/flume
a1.sinks.k1.filePrefix = netcat
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval=120

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
---------

Please continue to read the followup posts as well :)

No comments:

Post a Comment

Note: only a member of this blog may post a comment.