
Dynamic MySQL ETL in Talend Using tPrejob, tLibraryLoad, and tJava
Last updated: August 2024
Quick answer: Build a dynamic MySQL ETL pipeline in Talend using just three components: tPrejob to initialize, tLibraryLoad to load the MySQL JDBC driver, and tJava to execute all ETL logic via pure Java JDBC. This approach handles schema discovery, dynamic table creation, schema synchronization, data loading, and audit logging inside a single tJava component.
Introduction
Building a dynamic MySQL ETL pipeline in Talend using tPrejob, tLibraryLoad, and tJava gives teams full control over SQL logic, schema evolution, and audit tracking without relying on multiple Talend components or licensed ETL tools. When working with large data volumes, review our Talend performance tuning guide and be aware of heap memory issues that can arise with large datasets.
This blog explains how to build a dynamic MySQL ETL pipeline in Talend using only three components:
- tPrejob
- tLibraryLoad
- tJava
All ETL logic - including schema discovery, table creation, schema synchronization, data loading, and cumulative audit logging - is implemented using pure Java (JDBC) inside a single tJava component.
Why Use Only tPrejob, tLibraryLoad, and tJava?
Business Benefits
- Minimal Talend dependency
- Easier governance and audits
- Lower maintenance and operational cost
- Suitable for restricted or regulated environments
Technical Benefits
- Full SQL and schema transparency
- Metadata-driven ETL logic
- No dependency on Talend dynamic schemas
- Works consistently across Talend versions
Talend Job Architecture

Components Used
| Component | Responsibility |
|---|---|
| tPrejob | Job initialization |
| tLibraryLoad | Loads MySQL JDBC driver |
| tJava | Executes complete ETL logic |
This architecture is commonly used in advanced Talend jobs where developers prefer Java-driven orchestration over component-heavy pipelines.
Component-by-Component Explanation
1. tPrejob – Job Initialization
tPrejob ensures that initialization logic executes once before the ETL process starts. It controls execution order and prepares the runtime environment.
Best practice:
Always place driver loading and initialization logic inside tPrejob.
2. tLibraryLoad – JDBC Driver Management
tLibraryLoad dynamically loads the MySQL JDBC driver at runtime.
Why this is critical:
- Prevents ClassNotFoundException
- Keeps jobs portable across environments
- Eliminates the need to rebuild jobs for driver updates
3. tJava – Core ETL Logic
All ETL functionality is implemented inside a single tJava component, including:
- JDBC connection handling
- Dynamic source schema discovery
- Target database and table creation
- Schema synchronization
- Row-level data loading
- Cumulative audit logging
This approach provides maximum flexibility, transparency, and control.
Sample tJava Code Used in This ETL Job
The complete Java code is placed entirely inside the tJava component
/*Advance Settings*/
import java.sql.*;
import java.util.*;
/* =========================
tjava Basic Settings CONFIGURATION
========================= */
String mysqlUrl = "jdbc:mysql://localhost:3306/";
String mysqlUser = "*****";
String mysqlPassword = "*******";
/* SOURCE */
String sourceDb = "*****";
String sourceTable = "******";
/* TARGET */
String targetDb = "*****_audit_db";
String targetTable = "******_dt";
/* AUDIT TABLE */
String auditTable = "etl_audit_log";
/* LOAD USER */
String loadUser = mysqlUser;
/* Fully qualified names */
String sourceFullTable = sourceDb + "." + sourceTable;
String targetFullTable = targetDb + "." + targetTable;
/* =========================
CONNECT
========================= */
Class.forName("com.mysql.cj.jdbc.Driver");
Connection conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword);
Statement readStmt = conn.createStatement();
Statement writeStmt = conn.createStatement();
/* =========================
CREATE TARGET DATABASE
========================= */
writeStmt.execute("CREATE DATABASE IF NOT EXISTS " + targetDb);
/* =========================
READ SOURCE COLUMNS
========================= */
List srcColumns = new ArrayList<>();
ResultSet srcColRs = readStmt.executeQuery(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " +
"WHERE TABLE_SCHEMA = '" + sourceDb + "' " +
"AND TABLE_NAME = '" + sourceTable + "' " +
"ORDER BY ORDINAL_POSITION"
);
while (srcColRs.next()) {
srcColumns.add(srcColRs.getString("COLUMN_NAME"));
}
srcColRs.close();
/* =========================
CREATE TARGET DATA TABLE
========================= */
StringBuilder createTargetSQL = new StringBuilder(
"CREATE TABLE IF NOT EXISTS " + targetFullTable + " ("
);
for (int i = 0; i < srcColumns.size(); i++) {
createTargetSQL.append("`").append(srcColumns.get(i)).append("` VARCHAR(255)");
if (i < srcColumns.size() - 1) {
createTargetSQL.append(",");
}
}
createTargetSQL.append(")");
writeStmt.execute(createTargetSQL.toString());
/* =========================
SYNC NEW SOURCE COLUMNS
========================= */
Set tgtColumns = new HashSet<>();
ResultSet tgtColRs = readStmt.executeQuery(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " +
"WHERE TABLE_SCHEMA = '" + targetDb + "' " +
"AND TABLE_NAME = '" + targetTable + "'"
);
while (tgtColRs.next()) {
tgtColumns.add(tgtColRs.getString("COLUMN_NAME"));
}
tgtColRs.close();
for (String col : srcColumns) {
if (!tgtColumns.contains(col)) {
writeStmt.execute(
"ALTER TABLE " + targetFullTable +
" ADD COLUMN `" + col + "` VARCHAR(255)"
);
}
}
/* =========================
SOURCE ROW COUNT
========================= */
int sourceCount = 0;
ResultSet cntRs = readStmt.executeQuery(
"SELECT COUNT(*) FROM " + sourceFullTable
);
if (cntRs.next()) {
sourceCount = cntRs.getInt(1);
}
cntRs.close();
/* =========================
READ SOURCE DATA
========================= */
ResultSet srcRs = readStmt.executeQuery(
"SELECT * FROM " + sourceFullTable
);
/* =========================
INSERT INTO TARGET DATA TABLE
========================= */
int insertedThisRun = 0;
StringBuilder colBuilder = new StringBuilder();
for (int i = 0; i < srcColumns.size(); i++) {
colBuilder.append("`").append(srcColumns.get(i)).append("`");
if (i < srcColumns.size() - 1) {
colBuilder.append(",");
}
}
while (srcRs.next()) {
StringBuilder valBuilder = new StringBuilder("(");
for (int i = 0; i < srcColumns.size(); i++) {
String val = srcRs.getString(srcColumns.get(i));
if (val == null) {
valBuilder.append("NULL");
} else {
valBuilder.append("'").append(val.replace("'", "''")).append("'");
}
if (i < srcColumns.size() - 1) {
valBuilder.append(",");
}
}
valBuilder.append(")");
writeStmt.execute(
"INSERT INTO " + targetFullTable +
" (" + colBuilder + ") VALUES " + valBuilder
);
insertedThisRun++;
}
srcRs.close();
/* =========================
CREATE AUDIT TABLE
========================= */
writeStmt.execute(
"CREATE TABLE IF NOT EXISTS " + targetDb + "." + auditTable + " (" +
"id INT AUTO_INCREMENT PRIMARY KEY," +
"source_table_name VARCHAR(200)," +
"target_table_name VARCHAR(200)," +
"source_count INT," +
"target_count INT," +
"load_user VARCHAR(100)," +
"load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP" +
")"
);
/* =========================
READ PREVIOUS TARGET COUNT
========================= */
int previousTargetCount = 0;
ResultSet prevAuditRs = readStmt.executeQuery(
"SELECT target_count FROM " + targetDb + "." + auditTable +
" WHERE target_table_name = '" + targetFullTable + "'" +
" ORDER BY load_timestamp DESC LIMIT 1"
);
if (prevAuditRs.next()) {
previousTargetCount = prevAuditRs.getInt("target_count");
}
prevAuditRs.close();
/* =========================
INSERT AUDIT RECORD (CUMULATIVE)
========================= */
int cumulativeTargetCount = previousTargetCount + insertedThisRun;
writeStmt.execute(
"INSERT INTO " + targetDb + "." + auditTable +
" (source_table_name, target_table_name, source_count, target_count, load_user) VALUES (" +
"'" + sourceFullTable + "'," +
"'" + targetFullTable + "'," +
sourceCount + "," +
cumulativeTargetCount + "," +
"'" + loadUser + "'" +
")"
);
/* =========================
CLOSE
========================= */
readStmt.close();
writeStmt.close();
conn.close();
System.out.println("JOB COMPLETED SUCCESSFULLY");
System.out.println("SOURCE COUNT = " + sourceCount);
System.out.println("INSERTED THIS RUN = " + insertedThisRun);
System.out.println("TARGET TOTAL COUNT = " + cumulativeTargetCount);
System.out.println("AUDIT ROW INSERTED");
Audit Logging Strategy
This ETL uses a cumulative audit model, meaning:
- Each execution inserts one audit record
- Target record count is cumulative
- Historical load information is preserved
Audit Benefits
- Accurate data reconciliation
- Compliance and governance reporting
- Load trend analysis
- Simplified operational troubleshooting
End-to-End ETL Flow
1. tPrejob initializes job execution
2. tLibraryLoad loads the MySQL JDBC driver
3. tJava performs:
- Schema discovery
- Target table creation
- Schema synchronization
- Data loading
- Audit logging
4. Job completes with clear execution metrics
Pros and Cons
Advantages
- Minimal Talend components
- Full SQL and schema control
- Automatic schema adaptation
- Strong audit and governance
- Version-independent design
Limitations
- Row-by-row inserts may impact performance -- for bulk loading alternatives, see our comparison of tDBOutput vs tDBOutputBulk vs tDBBulkExec
- No incremental or CDC logic
- All columns stored as VARCHAR
- Manual error handling required
Real-World Use Cases
- MySQL-to-MySQL data migration
- Audit-driven ETL pipelines
- Lightweight data warehousing
- Metadata-driven ingestion frameworks
- Controlled enterprise batch jobs
Conclusion
This Talend ETL pattern demonstrates that robust, auditable, and production-ready pipelines can be built using only tPrejob, tLibraryLoad, and tJava. By leveraging JDBC and metadata-driven logic, teams gain full control, transparency, and flexibility without overengineering their Talend jobs.
Frequently Asked Questions
Q: What are tPrejob, tLibraryLoad, and tJava used for in Talend ETL?
tPrejob runs initialization tasks before the main job starts, tLibraryLoad loads external libraries such as JDBC drivers at runtime, and tJava allows you to write custom Java code for ETL logic including schema discovery, table creation, data loading, and audit logging.
Q: Can I build a full ETL pipeline in Talend with just three components?
Yes. By combining tPrejob, tLibraryLoad, and tJava, you can build a complete dynamic MySQL ETL pipeline that handles schema discovery, table creation, schema synchronization, data loading, and cumulative audit logging using pure JDBC inside tJava.
Q: What are the limitations of using tJava for all ETL logic?
The main limitations include row-by-row inserts that may impact performance, no built-in incremental or CDC logic, all columns stored as VARCHAR by default, and the need for manual error handling.
Related Articles
Still have questions?
Get AssistanceReady? Let's Talk!
Get expert insights and answers tailored to your business requirements and transformation.
Get Assistance