Kafka序列化 之自定义序列化

自定义序列化

Kafka 是以字节的方式发送数据,这样就涉及到对象的序列化和反序列化。Kafka自身提供了一些序列化工具,如StringSerializer,但是自带的序列化工具不能满足所有的使用场景,有些时候需要自定义序列化工具。

示例代码

在实际是使用当中一般会使用通用的序列化工具,如 Avro,Thrift,Protobuf 等工具,以下代码处于学习的目的,使用自定义的序列化工具。

消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}

序列化工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name is
Null)
N bytes representing customerName in UTF-8
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to
byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","com.test.CustomerSerializer");
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = new Customer(1 , 'test');
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}

反序列化工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerDeserializer implements Deserializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by
IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
Deserializers | 89
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name);
} catch (Exception e) {
throw new SerializationException("Error when serializing Customerto byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","com.test.CustomerDeserializer");
KafkaConsumer<String, Customer> consumer =new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
ConsumerRecords<String, Customer> records =
consumer.poll(100);
for (ConsumerRecord<String, Customer> record : records)
{
System.out.println("current customer Id: " + Kafka Consumers: Reading Data from Kafka
record.value().getId() + " and
current customer name: " + record.value().getName());
}
}