packagejdbcConnection;importio.vertx.core.AbstractVerticle;importio.vertx.core.AsyncResult;importio.vertx.core.Future;importio.vertx.core.Handler;importio.vertx.core.json.JsonArray;importio.vertx.core.json.JsonObject;importio.vertx.ext.asyncsql.MySQLClient;importio.vertx.ext.sql.SQLClient;importio.vertx.ext.sql.SQLConnection;importio.vertx.ext.sql.SQLRowStream;importjava.util.ArrayList;importjava.util.List;/*** Created by sweet on 2017/9/13.*/

public class MyFirstVerticle extendsAbstractVerticle {private String sql = "SELECT id, name FROM t_school WHERE id IN (";privateSQLClient sqlClient;public static voidmain(String[] args) {

Runner.runExample(MyFirstVerticle.class);

}

@Overridepublic void start(Future startFuture) throwsException {

JsonObject c= newJsonObject();//配置数据库连接

c.put("username", "root").put("password", "1367356")

.put("host","192.168.100.91").put("database", "user");

sqlClient=MySQLClient.createShared(vertx, c);

Future sqlConnectionFuture =Future.future();

sqlClient.getConnection(sqlConnectionFuture);

sqlConnectionFuture.setHandler(connection->{if(connection.succeeded()) {

SQLConnection conn=connection.result();

Future streamFuture1 =Future.future();

Future> streamFuture2 =Future.future();

Future> future =Future.future();//public interface Future extends AsyncResult, Handler>

conn.queryStream("SELECT id, name, t_school_id FROM t_user", streamFuture1);//前面sql语句执行的结果,交到streamFuture1容器里,或者由handler处理,Future是继承了Handler>的。//T是SQLRowStream,streamFuture1是Future。

streamFuture1.compose(sqlRowStream -> { //处理sqlRowStream//System.out.println(sqlRowStream.column("id"));//0//System.out.println(sqlRowStream.column("name"));//1

List users = new ArrayList<>();

sqlRowStream.handler(jsonArray-> { //sqlRowStream,转换为了jsonArray。["23","lisi","yuhuange"]

System.out.println(jsonArray);

users.add(newUser(jsonArray));

});

System.out.println("user size: " + users.size()); //1

streamFuture2.complete(users);

}, streamFuture2)//处理完成,有一个结果Future,继续处理。

.compose(users -> { //处理users

List list = new ArrayList<>();

JsonArray collect=users.parallelStream()

.map(User::getSchoolId)

.collect(JsonArray::new, JsonArray::add, JsonArray::addAll);

String sql2= sql += Utils.placeholder("?", collect.size(), ", ") + ")";

conn.queryStreamWithParams(sql2, collect, schoolResult->{if(schoolResult.failed()){

schoolResult.cause().printStackTrace();return;

}

schoolResult.result().handler(jsonArray1-> { //将schoolResult转化为json数据进行处理

if (jsonArray1 != null && jsonArray1.size() > 0)

list.add(jsonArray1);

});

future.complete(list);

});

}, future);

future.setHandler(list-> { //异步结果list

conn.close(); //关闭流

if(list.failed()) {

list.cause().printStackTrace();return;

}

System.out.println("-----");

list.result().forEach(System.out::println);

});

}else{

connection.cause().printStackTrace();

System.err.println(connection.cause().getMessage());

}

});

startFuture.complete();

}

}

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐