使用DTS提供的SDK订阅到的数据为自定义的格式,本文介绍各类SQL语句解析的示例代码。

DDL解析

如果源库执行了DDL操作,记录(Record)的操作类型即为DDL,且DDL语句存储在第一列的值中。获取DDL语句的代码示例如下:

String ddl_string;                   
Record.Type type=record.getOpt();
if(type.equals(Record.Type.DDL)){
    List<DataMessage.Record.Field> fields = record.getFieldList();   
    ddl_string = fields.get(0).getValue().toString();
}

INSERT解析

如果源库执行了INSERT操作,记录的操作类型即为INSERT,获取INSERT完整语句的代码示例如下:

StringBuilder insert_string=new StringBuilder();
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder FieldName=new StringBuilder();
StringBuilder FieldValue = new StringBuilder();
if(type.equals(Record.Type.INSERT)){
       int i=0;
       List<DataMessage.Record.Field> fields = record.getFieldList();   
       for (; i < fields.size(); i++) {
              field = fields.get(i);                                   FieldName.append('`'+field.getFieldname().toLowerCase()+'`');
              FieldValue.append("'"+field.getValue()+"'");
              if (i != fields.size() - 1) {
                      FieldName.append(',');
                      FieldValue.append(',');
              }
        }
        insert_string.append("insert "+ record.getTablename()+"("+FieldName.toString()+") values("+FieldValue.toString()+");");
}

UPDATE解析

如果源库执行了UPDATE操作,记录的操作类型即为UPDATE。数据更新前的字段存储在Record.getFieldList() 中索引为偶数的Field,更新后的字段值存储在索引为奇数的Field。

当执行UPDATE的表具备主键时,获取UPDATE完整语句的代码示例如下:

StringBuilder update_string=new StringBuilder();                   
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder SetValue = new StringBuilder();
StringBuilder WhereCondition = new StringBuilder();
String ConditionStr;
boolean hasPk=false;
boolean pkMode=false;
boolean hasSet=false;
if(type.equals(Record.Type.UPDATE)){                        
    int i=0;
    DataMessage.Record.Field OldField = null;
    DataMessage.Record.Field NewField = null;
    List<DataMessage.Record.Field> fields = record.getFieldList();
    for (; i <fields.size() ; i++) {
        if (i % 2 == 0) {
            OldField = fields.get(i);
            continue;
        }
    NewField = fields.get(i);
    field = NewField;
    if (field.isPrimary()) {
        if (hasPk) {
            WhereCondition.append(" and ");
        }
        //where old value
        ConditionStr = getFieldValue(OldField);
        if(ConditionStr==null){                                         WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`" + " " + "is null");
        }else{
               WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`"+" = "+ "'"+OldField.getValue()+"'");                       
         }
        hasPk = true;
    }
    if (hasSet) {
        SetValue.append(",");
    }
      SetValue.append("`"+field.getFieldname().toLowerCase()+"`" + " = " + "'"+field.getValue()+"'");
    String setStr = getFieldValue(field);
    hasSet = true;
    }
    update_string.append("Update "+record.getTablename() +" Set " + SetValue + " Where "+WhereCondition +";");                        
}
protected String getFieldValue(Field field) throws Exception {
    ByteString byteString = field.getValue();
    if (byteString == null) {
        return null;
    }
    else {
        String value;
        if (field.getType() == com.aliyun.drc.client.message.DataMessage.Record.Field.Type.STRING && field.getEncoding() != null && field.getEncoding() != "ASCII") {
            value = field.getValue().toString(field.getEncoding()); 
        }
        else {
          value = byteString.toString();
        }
        return value;
    }
}

DELETE解析

如果源库执行了DELETE语句,该记录的操作类型为DELETE。当执行DELETE的表具备主键时,获取DELETE完整语句的代码示例如下:

StringBuilder delete_string=new StringBuilder();                   
Record.Type type=record.getOpt();
DataMessage.Record.Field field;
StringBuilder FieldName=new StringBuilder();
StringBuilder FieldValue = new StringBuilder();
StringBuilder DeleteCondition = new StringBuilder();
boolean hasPk=false;
boolean pkMode=false;
if(type.equals(Record.Type.DELETE)){
   int i=0;
   List<DataMessage.Record.Field> fields = record.getFieldList();                     
   delete_string.append("Delete From" + record.getTablename() + "where");                         
   // 表是否有主键?
   if (record.getPrimaryKeys() != null) {
             pkMode = record.getPrimaryKeys().length() > 0 ? true : false;
   }                        
   for (; i < fields.size(); i++) {
            if ((pkMode && !field.isPrimary())) {
                    continue;
            }
            if (hasPk) {
                    delete_string.append(" and ");
            }
            delete_string.append(field.getFieldname() + "=" + field.getValue());
            hasPk = true;
    }
    delete_string.append(";");
}

REPLACE解析

如果源库执行了REPLACE操作,该记录的操作类型即为UPDATE或INSERT。

  • 当REPLACE设置的值不存在时,该记录的操作类型为INSERT。
  • 当REPLACE设置的值存在时,该记录的操作类型为UPDATE。

BEGIN解析

如果源库执行了BEGIN操作,该记录的操作类型即为BEGIN。由于BEGIN语句没有实际的内容,只需要判断操作类型,无需对Field进行处理,代码示例如下:

StringBuilder sql_string = new StringBuilder();
Record.Type type = record.getOpt();
if(type.equals(Record.Type.BEGIN)){
        sql_string.append("Begin");
}

COMMIT解析

如果源库执行了COMMIT操作,该记录的操作类型即为COMMIT。由于COMMIT语句没有实际的内容,只需要判断操作类型,无需对Field进行处理,代码示例如下:

StringBuilder sql_string = new StringBuilder();
Record.Type type = record.getOpt();
if(type.equals(Record.Type.COMMIT)){
        sql_string.append("commit");
}