RecordWriter recordWriter = uploadSession.openRecordWriter(0);
ArrayRecord record = (ArrayRecord) uploadSession.newRecord();
// prepare data
List arrayData = Arrays.asList(1, 2, 3);
Map mapData = new HashMap();
mapData.put("a", 1L);
mapData.put("c", 2L);
List
从MaxCompute下载复杂类型数据
代码示例:
RecordReader recordReader = downloadSession.openRecordReader(0, 1);
// read the record
ArrayRecord record1 = (ArrayRecord)recordReader.read();
// get array field data
List field0 = record1.getArray(0);
List longField0 = record1.getArray(Long.class, 0);
// get map field data
Map field1 = record1.getMap(1);
Map typedField1 = record1.getMap(String.class, Long.class, 1);
// get struct field data
Struct field2 = record1.getStruct(2);
运行实例
完整代码如下:
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.data.SimpleStruct;
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.StructTypeInfo;
public class TunnelComplexTypeSample {
private static String accessId = "";
private static String accessKey = "";
private static String odpsUrl = "";
private static String project = "";
private static String table = "";
// partitions of a partitioned table, eg: "pt=\'1\',ds=\'2\'"
// if the table is not a partitioned table, do not need it
private static String partition = "";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
PartitionSpec partitionSpec = new PartitionSpec(partition);
// ---------- Upload Data ---------------
// create upload session for table
// the table schema is {"col0": ARRAY, "col1": MAP, "col2": STRUCT}
UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec);
// get table schema
TableSchema schema = uploadSession.getSchema();
// open record writer
RecordWriter recordWriter = uploadSession.openRecordWriter(0);
ArrayRecord record = (ArrayRecord) uploadSession.newRecord();
// prepare data
List arrayData = Arrays.asList(1, 2, 3);
Map mapData = new HashMap();
mapData.put("a", 1L);
mapData.put("c", 2L);
List structData = new ArrayList();
structData.add("Lily");
structData.add(18);
// set data to record
record.setArray(0, arrayData);
record.setMap(1, mapData);
record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(),
structData));
// write the record
recordWriter.write(record);
// close writer
recordWriter.close();
// commit uploadSession, the upload finish
uploadSession.commit(new Long[]{0L});
System.out.println("upload success!");
// ---------- Download Data ---------------
// create download session for table
// the table schema is {"col0": ARRAY, "col1": MAP, "col2": STRUCT}
DownloadSession downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
schema = downloadSession.getSchema();
// open record reader, read one record here for example
RecordReader recordReader = downloadSession.openRecordReader(0, 1);
// read the record
ArrayRecord record1 = (ArrayRecord)recordReader.read();
// get array field data
List field0 = record1.getArray(0);
List longField0 = record1.getArray(Long.class, 0);
// get map field data
Map field1 = record1.getMap(1);
Map typedField1 = record1.getMap(String.class, Long.class, 1);
// get struct field data
Struct field2 = record1.getStruct(2);
System.out.println("download success!");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}