使用DTS提供的SDK订阅到的数据为自定义的格式,本文介绍各类SQL语句解析的示例代码。
DDL解析
如果源库执行了DDL操作,记录(Record)的操作类型即为DDL,且DDL语句存储在第一列的值中。获取DDL语句的代码示例如下:
String ddl_string; Record.Type type=record.getOpt(); if(type.equals(Record.Type.DDL)){ List<DataMessage.Record.Field> fields = record.getFieldList(); ddl_string = fields.get(0).getValue().toString(); }
INSERT解析
如果源库执行了INSERT操作,记录的操作类型即为INSERT,获取INSERT完整语句的代码示例如下:
StringBuilder insert_string=new StringBuilder(); Record.Type type=record.getOpt(); DataMessage.Record.Field field; StringBuilder FieldName=new StringBuilder(); StringBuilder FieldValue = new StringBuilder(); if(type.equals(Record.Type.INSERT)){ int i=0; List<DataMessage.Record.Field> fields = record.getFieldList(); for (; i < fields.size(); i++) { field = fields.get(i); FieldName.append('`'+field.getFieldname().toLowerCase()+'`'); FieldValue.append("'"+field.getValue()+"'"); if (i != fields.size() - 1) { FieldName.append(','); FieldValue.append(','); } } insert_string.append("insert "+ record.getTablename()+"("+FieldName.toString()+") values("+FieldValue.toString()+");"); }
UPDATE解析
如果源库执行了UPDATE操作,记录的操作类型即为UPDATE。数据更新前的字段存储在Record.getFieldList()
中索引为偶数的Field,更新后的字段值存储在索引为奇数的Field。
当执行UPDATE的表具备主键时,获取UPDATE完整语句的代码示例如下:
StringBuilder update_string=new StringBuilder(); Record.Type type=record.getOpt(); DataMessage.Record.Field field; StringBuilder SetValue = new StringBuilder(); StringBuilder WhereCondition = new StringBuilder(); String ConditionStr; boolean hasPk=false; boolean pkMode=false; boolean hasSet=false; if(type.equals(Record.Type.UPDATE)){ int i=0; DataMessage.Record.Field OldField = null; DataMessage.Record.Field NewField = null; List<DataMessage.Record.Field> fields = record.getFieldList(); for (; i <fields.size() ; i++) { if (i % 2 == 0) { OldField = fields.get(i); continue; } NewField = fields.get(i); field = NewField; if (field.isPrimary()) { if (hasPk) { WhereCondition.append(" and "); } //where old value ConditionStr = getFieldValue(OldField); if(ConditionStr==null){ WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`" + " " + "is null"); }else{ WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`"+" = "+ "'"+OldField.getValue()+"'"); } hasPk = true; } if (hasSet) { SetValue.append(","); } SetValue.append("`"+field.getFieldname().toLowerCase()+"`" + " = " + "'"+field.getValue()+"'"); String setStr = getFieldValue(field); hasSet = true; } update_string.append("Update "+record.getTablename() +" Set " + SetValue + " Where "+WhereCondition +";"); } protected String getFieldValue(Field field) throws Exception { ByteString byteString = field.getValue(); if (byteString == null) { return null; } else { String value; if (field.getType() == com.aliyun.drc.client.message.DataMessage.Record.Field.Type.STRING && field.getEncoding() != null && field.getEncoding() != "ASCII") { value = field.getValue().toString(field.getEncoding()); } else { value = byteString.toString(); } return value; } }
DELETE解析
如果源库执行了DELETE语句,该记录的操作类型为DELETE。当执行DELETE的表具备主键时,获取DELETE完整语句的代码示例如下:
StringBuilder delete_string=new StringBuilder(); Record.Type type=record.getOpt(); DataMessage.Record.Field field; StringBuilder FieldName=new StringBuilder(); StringBuilder FieldValue = new StringBuilder(); StringBuilder DeleteCondition = new StringBuilder(); boolean hasPk=false; boolean pkMode=false; if(type.equals(Record.Type.DELETE)){ int i=0; List<DataMessage.Record.Field> fields = record.getFieldList(); delete_string.append("Delete From" + record.getTablename() + "where"); // 表是否有主键? if (record.getPrimaryKeys() != null) { pkMode = record.getPrimaryKeys().length() > 0 ? true : false; } for (; i < fields.size(); i++) { if ((pkMode && !field.isPrimary())) { continue; } if (hasPk) { delete_string.append(" and "); } delete_string.append(field.getFieldname() + "=" + field.getValue()); hasPk = true; } delete_string.append(";"); }
REPLACE解析
如果源库执行了REPLACE操作,该记录的操作类型即为UPDATE或INSERT。
- 当REPLACE设置的值不存在时,该记录的操作类型为INSERT。
- 当REPLACE设置的值存在时,该记录的操作类型为UPDATE。
BEGIN解析
如果源库执行了BEGIN操作,该记录的操作类型即为BEGIN。由于BEGIN语句没有实际的内容,只需要判断操作类型,无需对Field进行处理,代码示例如下:
StringBuilder sql_string = new StringBuilder(); Record.Type type = record.getOpt(); if(type.equals(Record.Type.BEGIN)){ sql_string.append("Begin"); }
COMMIT解析
如果源库执行了COMMIT操作,该记录的操作类型即为COMMIT。由于COMMIT语句没有实际的内容,只需要判断操作类型,无需对Field进行处理,代码示例如下:
StringBuilder sql_string = new StringBuilder(); Record.Type type = record.getOpt(); if(type.equals(Record.Type.COMMIT)){ sql_string.append("commit"); }