Dynamic MySQL ETL in Talend

Dynamic MySQL ETL in Talend Using tPrejob, tLibraryLoad, and tJava

Celestinfo Software Solutions Pvt. Ltd. Aug 06, 2024

Last updated: August 2024

Quick answer: Build a dynamic MySQL ETL pipeline in Talend using just three components: tPrejob to initialize, tLibraryLoad to load the MySQL JDBC driver, and tJava to execute all ETL logic via pure Java JDBC. This approach handles schema discovery, dynamic table creation, schema synchronization, data loading, and audit logging inside a single tJava component.

Introduction

Building a dynamic MySQL ETL pipeline in Talend using tPrejob, tLibraryLoad, and tJava gives teams full control over SQL logic, schema evolution, and audit tracking without relying on multiple Talend components or licensed ETL tools. When working with large data volumes, review our Talend performance tuning guide and be aware of heap memory issues that can arise with large datasets.


This blog explains how to build a dynamic MySQL ETL pipeline in Talend using only three components:



All ETL logic - including schema discovery, table creation, schema synchronization, data loading, and cumulative audit logging - is implemented using pure Java (JDBC) inside a single tJava component.

Why Use Only tPrejob, tLibraryLoad, and tJava?

Business Benefits



Technical Benefits


Talend Job Architecture


Dynamic MySQL ETL in Talend

Components Used


Component Responsibility
tPrejob Job initialization
tLibraryLoad Loads MySQL JDBC driver
tJava Executes complete ETL logic

This architecture is commonly used in advanced Talend jobs where developers prefer Java-driven orchestration over component-heavy pipelines.


Component-by-Component Explanation


1. tPrejob – Job Initialization

tPrejob ensures that initialization logic executes once before the ETL process starts. It controls execution order and prepares the runtime environment.


Best practice:


Always place driver loading and initialization logic inside tPrejob.


2. tLibraryLoad – JDBC Driver Management


tLibraryLoad dynamically loads the MySQL JDBC driver at runtime.


Why this is critical:


3. tJava – Core ETL Logic


All ETL functionality is implemented inside a single tJava component, including:

This approach provides maximum flexibility, transparency, and control.


Sample tJava Code Used in This ETL Job

The complete Java code is placed entirely inside the tJava component

tJava snippet - paste your code here


   /*Advance Settings*/
    
    import java.sql.*;
    import java.util.*;

    /* =========================
    tjava Basic Settings CONFIGURATION
    ========================= */
 
 String mysqlUrl = "jdbc:mysql://localhost:3306/";
 String mysqlUser = "*****";
 String mysqlPassword = "*******";
 
 /* SOURCE */
 String sourceDb = "*****";
 String sourceTable = "******";
 
 /* TARGET */
 String targetDb = "*****_audit_db";
 String targetTable = "******_dt";
 
 /* AUDIT TABLE */
 String auditTable = "etl_audit_log";
 
 /* LOAD USER */
 String loadUser = mysqlUser;
 
 /* Fully qualified names */
 String sourceFullTable = sourceDb + "." + sourceTable;
 String targetFullTable = targetDb + "." + targetTable;
 
 /* =========================
    CONNECT
    ========================= */
 
 Class.forName("com.mysql.cj.jdbc.Driver");
 Connection conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword);
 
 Statement readStmt  = conn.createStatement();
 Statement writeStmt = conn.createStatement();
 
 /* =========================
    CREATE TARGET DATABASE
    ========================= */
 
 writeStmt.execute("CREATE DATABASE IF NOT EXISTS " + targetDb);
 
 /* =========================
    READ SOURCE COLUMNS
    ========================= */
 
 List srcColumns = new ArrayList<>();
 
 ResultSet srcColRs = readStmt.executeQuery(
     "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " +
     "WHERE TABLE_SCHEMA = '" + sourceDb + "' " +
     "AND TABLE_NAME = '" + sourceTable + "' " +
     "ORDER BY ORDINAL_POSITION"
 );
 
 while (srcColRs.next()) {
     srcColumns.add(srcColRs.getString("COLUMN_NAME"));
 }
 srcColRs.close();
 
 /* =========================
    CREATE TARGET DATA TABLE
    ========================= */
 
 StringBuilder createTargetSQL = new StringBuilder(
     "CREATE TABLE IF NOT EXISTS " + targetFullTable + " ("
 );
 
 for (int i = 0; i < srcColumns.size(); i++) {
     createTargetSQL.append("`").append(srcColumns.get(i)).append("` VARCHAR(255)");
     if (i < srcColumns.size() - 1) {
         createTargetSQL.append(",");
     }
 }
 createTargetSQL.append(")");
 
 writeStmt.execute(createTargetSQL.toString());
 
 /* =========================
    SYNC NEW SOURCE COLUMNS
    ========================= */
 
 Set tgtColumns = new HashSet<>();
 
 ResultSet tgtColRs = readStmt.executeQuery(
     "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " +
     "WHERE TABLE_SCHEMA = '" + targetDb + "' " +
     "AND TABLE_NAME = '" + targetTable + "'"
 );
 
 while (tgtColRs.next()) {
     tgtColumns.add(tgtColRs.getString("COLUMN_NAME"));
 }
 tgtColRs.close();
 
 for (String col : srcColumns) {
     if (!tgtColumns.contains(col)) {
         writeStmt.execute(
             "ALTER TABLE " + targetFullTable +
             " ADD COLUMN `" + col + "` VARCHAR(255)"
         );
     }
 }
 
 /* =========================
    SOURCE ROW COUNT
    ========================= */
 
 int sourceCount = 0;
 ResultSet cntRs = readStmt.executeQuery(
     "SELECT COUNT(*) FROM " + sourceFullTable
 );
 if (cntRs.next()) {
     sourceCount = cntRs.getInt(1);
 }
 cntRs.close();
 
 /* =========================
    READ SOURCE DATA
    ========================= */
 
 ResultSet srcRs = readStmt.executeQuery(
     "SELECT * FROM " + sourceFullTable
 );
 
 /* =========================
    INSERT INTO TARGET DATA TABLE
    ========================= */
 
 int insertedThisRun = 0;
 
 StringBuilder colBuilder = new StringBuilder();
 for (int i = 0; i < srcColumns.size(); i++) {
     colBuilder.append("`").append(srcColumns.get(i)).append("`");
     if (i < srcColumns.size() - 1) {
         colBuilder.append(",");
     }
 }
 
 while (srcRs.next()) {
 
     StringBuilder valBuilder = new StringBuilder("(");
 
     for (int i = 0; i < srcColumns.size(); i++) {
         String val = srcRs.getString(srcColumns.get(i));
         if (val == null) {
             valBuilder.append("NULL");
         } else {
             valBuilder.append("'").append(val.replace("'", "''")).append("'");
         }
         if (i < srcColumns.size() - 1) {
             valBuilder.append(",");
         }
     }
     valBuilder.append(")");
 
     writeStmt.execute(
         "INSERT INTO " + targetFullTable +
         " (" + colBuilder + ") VALUES " + valBuilder
     );
 
     insertedThisRun++;
 }
 srcRs.close();
 
 /* =========================
    CREATE AUDIT TABLE
    ========================= */
 
 writeStmt.execute(
     "CREATE TABLE IF NOT EXISTS " + targetDb + "." + auditTable + " (" +
     "id INT AUTO_INCREMENT PRIMARY KEY," +
     "source_table_name VARCHAR(200)," +
     "target_table_name VARCHAR(200)," +
     "source_count INT," +
     "target_count INT," +
     "load_user VARCHAR(100)," +
     "load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP" +
     ")"
 );
 
 /* =========================
    READ PREVIOUS TARGET COUNT
    ========================= */
 
 int previousTargetCount = 0;
 
 ResultSet prevAuditRs = readStmt.executeQuery(
     "SELECT target_count FROM " + targetDb + "." + auditTable +
     " WHERE target_table_name = '" + targetFullTable + "'" +
     " ORDER BY load_timestamp DESC LIMIT 1"
 );
 
 if (prevAuditRs.next()) {
     previousTargetCount = prevAuditRs.getInt("target_count");
 }
 prevAuditRs.close();
 
 /* =========================
    INSERT AUDIT RECORD (CUMULATIVE)
    ========================= */
 
 int cumulativeTargetCount = previousTargetCount + insertedThisRun;
 
 writeStmt.execute(
     "INSERT INTO " + targetDb + "." + auditTable +
     " (source_table_name, target_table_name, source_count, target_count, load_user) VALUES (" +
     "'" + sourceFullTable + "'," +
     "'" + targetFullTable + "'," +
     sourceCount + "," +
     cumulativeTargetCount + "," +
     "'" + loadUser + "'" +
     ")"
 );
 
 /* =========================
    CLOSE
    ========================= */
 
 readStmt.close();
 writeStmt.close();
 conn.close();
 
 System.out.println("JOB COMPLETED SUCCESSFULLY");
 System.out.println("SOURCE COUNT        = " + sourceCount);
 System.out.println("INSERTED THIS RUN   = " + insertedThisRun);
 System.out.println("TARGET TOTAL COUNT = " + cumulativeTargetCount);
 System.out.println("AUDIT ROW INSERTED");
 

Audit Logging Strategy

This ETL uses a cumulative audit model, meaning:

Audit Benefits

End-to-End ETL Flow

1. tPrejob initializes job execution

2. tLibraryLoad loads the MySQL JDBC driver

3. tJava performs:

4. Job completes with clear execution metrics


Pros and Cons


Advantages



Limitations


Real-World Use Cases



Conclusion


This Talend ETL pattern demonstrates that robust, auditable, and production-ready pipelines can be built using only tPrejob, tLibraryLoad, and tJava. By leveraging JDBC and metadata-driven logic, teams gain full control, transparency, and flexibility without overengineering their Talend jobs.

Frequently Asked Questions

Q: What are tPrejob, tLibraryLoad, and tJava used for in Talend ETL?

tPrejob runs initialization tasks before the main job starts, tLibraryLoad loads external libraries such as JDBC drivers at runtime, and tJava allows you to write custom Java code for ETL logic including schema discovery, table creation, data loading, and audit logging.

Q: Can I build a full ETL pipeline in Talend with just three components?

Yes. By combining tPrejob, tLibraryLoad, and tJava, you can build a complete dynamic MySQL ETL pipeline that handles schema discovery, table creation, schema synchronization, data loading, and cumulative audit logging using pure JDBC inside tJava.

Q: What are the limitations of using tJava for all ETL logic?

The main limitations include row-by-row inserts that may impact performance, no built-in incremental or CDC logic, all columns stored as VARCHAR by default, and the need for manual error handling.

Related Articles

Burning Questions
About CelestInfo

Simple answers to make things clear.

Our AI insights are continuously trained on large datasets and validated by experts to ensure high accuracy.

Absolutely. CelestInfo supports integration with a wide range of industry-standard software and tools.

We implement enterprise-grade encryption, access controls, and regular audits to ensure your data is safe.

Insights are updated in real-time as new data becomes available.

We offer 24/7 support via chat, email, and dedicated account managers.

Still have questions?

Get Assistance

Ready? Let's Talk!

Get expert insights and answers tailored to your business requirements and transformation.

Get Assistance
Share this article: