When working with Kafka Connect, you often need to transform records on-the-fly—either before inserting them into Kafka (source connectors) or before exporting them from Kafka (sink connectors). Confluent offers a variety of ready-to-use Single Message Transformations (SMTs) such as Cast, ExtractField, InsertField, etc. However, there are times when you need specialized logic that these built-in transformations do not cover. In that case, building a custom Single Message Transformation can give you full control over how data is manipulated.
In this article, we’ll create a custom Single Message Transformation to handle the following scenario:
2. We want to output a record that maps to a relational table with columns:
id, fullName, phoneNumber, email
The transformation must combine fields (e.g., firstName + lastName → fullName) based on a configurable JSON definition.
Below is a step-by-step guide to achieving this with a custom Single Message Transformation in Java, along with all the necessary Maven dependencies, configuration, and a minimal connector configuration example.
Why a Custom Single Message Transformation?
Flexible Field Mappings: You can define which input fields map to which output fields, including concatenating multiple fields into one.
Easy Extension: You can extend or modify this transformation with domain-specific logic (e.g., custom validations, data type conversions, or specialized transformations).
Separation of Concerns: Keep your data transformation logic within Kafka Connect, rather than in your producer or consumer code.
Step 1: Create a New Maven Project
Create a new Maven project, for instance in an IDE like IntelliJ or Eclipse. In your pom.xml, include the following dependencies to support Kafka Connect APIs, JSON processing, and unit testing.
This ensures you have everything required to develop and test your custom transformation.
Step 2: Define the Transformation Configuration
We want a flexible way to tell our custom SMT which input JSON fields map to which output field. We’ll pass a JSON array of mappings as the outputFields property. Here’s a sample configuration:
Below is the entire Java class implementing Transformation. It reads the custom configuration, parses the record’s JSON payload, and produces a Struct with new field mappings.
package com.kafkawizamr;
import com.kafkawizamr.config.CustomConfig;
import com.kafkawizamr.model.OutputFieldsMapping;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.*;
public class CustomSingleMessageTransformationApp> implements Transformation {
public static final ConfigDef CONFIG_DEF;
private List outputFields;
public void configure(Map configs) {
CustomConfig config = new CustomConfig(CONFIG_DEF, configs);
this.outputFields = config.getListOfOutputFields("outputFields");
}
public R apply(R record) {
if (record.value() != null) {
JSONObject obj;
try {
obj = new JSONObject(record.value().toString());
} catch (JSONException e) {
throw new RuntimeException("Bad message -> " + record.value().toString());
}
SchemaBuilder schemaStruct = SchemaBuilder.struct();
Map tempStruct = new HashMap<>();
for (OutputFieldsMapping outputField : outputFields) {
Schema tempSchema = Schema.OPTIONAL_STRING_SCHEMA;
schemaStruct.field(outputField.getOutputFieldName(), tempSchema).build();
List fieldValues = new ArrayList<>();
outputField.getInputFields().forEach(field -> fieldValues.add(obj.getString(field)));
String outputFieldValue = String.join(" ", fieldValues);
tempStruct.put(outputField.getOutputFieldName(), outputFieldValue);
}
Struct valueStruct = new Struct(schemaStruct.schema());
tempStruct.forEach(valueStruct::put);
return record.newRecord(record.topic(), record.kafkaPartition(), Schema.OPTIONAL_STRING_SCHEMA, record.key(), schemaStruct.schema(), valueStruct, record.timestamp());
} else {
throw new RuntimeException("Message value is null");
}
}
public ConfigDef config() {
return CONFIG_DEF;
}
public void close() {
}
static {
ConfigDef configDef = new ConfigDef();
configDef.define("outputFields", Type.STRING, null, new ConfigDef.NonEmptyString(), Importance.HIGH, "The output fields required and their mapping.");
CONFIG_DEF = configDef;
}
}
Key points:
Implements Transformation<R>, the Kafka Connect interface for custom transformations.
Expects a JSON string for the record’s value.
Reads and applies the outputFields configurations to build a new Struct.
Uses String.join(" ", fieldValues) to concatenate fields like firstName and lastName into a single fullName value.
Step 4: Configuration Class
To make our code more readable, we store the logic to parse JSON-based configuration in a separate CustomConfig class:
package com.kafkawizamr.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kafkawizamr.model.OutputFieldsMapping;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import java.util.List;
import java.util.Map;
public class CustomConfig extends AbstractConfig {
public CustomConfig(ConfigDef configDef, Map, ?> originals) {
super(configDef, originals, false);
}
public List getListOfOutputFields(String key) {
if (getString(key) != null) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(getString(key), new TypeReference<>() {
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
} else {
return null;
}
}
}
Step 5: Output Fields Model
This is a simple POJO that holds each output field name and the list of input fields that build it:
package com.kafkawizamr.model;
import java.io.Serializable;
import java.util.List;
public class OutputFieldsMapping implements Serializable {
private String outputFieldName;
private List inputFields;
public OutputFieldsMapping() {
}
public OutputFieldsMapping(String outputFieldName, List inputFields) {
this.outputFieldName = outputFieldName;
this.inputFields = inputFields;
}
public String getOutputFieldName() {
return outputFieldName;
}
public void setOutputFieldName(String outputFieldName) {
this.outputFieldName = outputFieldName;
}
public List getInputFields() {
return inputFields;
}
public void setInputFields(List inputFields) {
this.inputFields = inputFields;
}
@Override
public String toString() {
return "OutputFieldsMapping{" +
"outputFieldName='" + outputFieldName + '\'' +
", inputFields=" + inputFields +
'}';
}
}
Step 6: Testing the Transformation
Here’s a simple JUnit test that illustrates how to use our custom SMT. The test:
Creates the transformation instance.
Passes the JSON-based mapping configuration.
Applies the transformation to a sample record with {"id":"123g2","firstName":"John","lastName":"Doe",...}.
Prints the resulting schema fields and values.
package com.kafkawizamr;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
public class CustomSingleMessageTransformationAppTest {
@AfterEach
public void teardown() {
}
@Test
public void test() {
try (CustomSingleMessageTransformationApp testSMT = new CustomSingleMessageTransformationApp<>()) {
Map stringMap = new HashMap<>();
stringMap.put("outputFields", "[{\"outputFieldName\":\"id\",\"inputFields\":[\"id\"]},{\"outputFieldName\":\"fullName\",\"inputFields\":[\"firstName\",\"lastName\"]},{\"outputFieldName\":\"phoneNumber\",\"inputFields\":[\"phoneNumber\"]},{\"outputFieldName\":\"email\",\"inputFields\":[\"email\"]}]");
testSMT.configure(stringMap);
String sampleMessage = "{\"id\":\"123g2\",\"firstName\":\"John\",\"lastName\":\"Doe\",\"phoneNumber\":\"0501234567\",\"email\":\"john.doe@gmail.com\"}";
final SinkRecord record = new SinkRecord("test-topic", 0, null, null, null, sampleMessage, 0);
final SinkRecord transformedRecord = testSMT.apply(record);
transformedRecord.valueSchema().fields().forEach(f -> System.out.println(f.name()));
System.out.println("Key:" + transformedRecord.key());
System.out.println("Value:" + transformedRecord.value());
transformedRecord.valueSchema().schema().fields().forEach(System.out::println);
}
}
}
Run this test, and if everything is configured correctly, you’ll see the new schema fields (id, fullName, phoneNumber, email) logged to your console.
Step 7: Build and Deploy the JAR
Build the JAR: Run mvn clean package.
Copy the JAR: Place the generated JAR file into your Kafka Connect plugin path (for Confluent Platform, often something like /usr/local/share/kafka/plugins/ or a custom directory you’ve configured).
Restart the Connect Worker: Kafka Connect needs to reload plugins to recognize your new transformation.
Step 8: Minimal Connector Configuration Example
Below is a simple snippet for a sink connector (e.g., JDBC Sink) that uses your custom transformation. Assume you have a JDBC Sink that writes to a MySQL table with columns matching id, fullName, phoneNumber, email.
Kafka Connect will read messages from my-transformed-topic.
It will apply the CustomSingleMessageTransformationApp.
The resulting Struct will have the columns id, fullName, phoneNumber, email.
The JDBC Sink will insert those columns into the target MySQL table.
Conclusion
Implementing a Custom Single Message Transformation in Kafka Connect provides unmatched flexibility for data manipulation, allowing you to tailor the mapping logic exactly to your needs. You can now:
Concatenate multiple fields into one (e.g., firstName + lastName = fullName).
Filter or mask fields in any way your business requires.
Reuse this logic across multiple connectors simply by referencing it in different connector configurations.
By following the steps above—creating a Maven project, building the custom transformation interface, testing with JUnit, and deploying the final JAR into Kafka Connect—you’ll have a solid blueprint for any kind of advanced, custom transformation use case.