-ความจริงของ Reactive คืออะไร?
-สิ่งที่น่าสนใจของ Reactive ใน Java EE
-สิ่งที่ Reactive ทำได้ใน Java SE 8?
Reactive: What’s in a Name?
“Reactive” คือ การแก้อุปสรรค์ใหญ่ เพื่อนำไปใช้งานในวงกว้างของนักพัฒนา
หลักความเสียงร่วมกัน จากความกังวลด้านการตลาด มีปัจจัย ดังนี้
-เหตุการณ์ที่ขับเคลื่อน (Event driven)
-ความไม่ตรงกัน (Asynchronous)
-การไม่ปิดกั้น (Non-blocking)
-Message driven
*** บางคนกังวลมากเกินไปเกี่ยวกับการใช้ Reactive ในเรื่อง Responsive, resilient, elastic, adaptive, fault-tolerant, scalable และอื่นๆ จากหลักการใช้งาน
มีอะไรที่เป็นเรื่องใหญ่ใน Reactive?
Reactive ได้เสนอเทคนิควิศวกรรมซอฟแวร์ที่สำคัญ คือ
-การสร้างประสบการณ์การใช้งาน และการตอบสนองการใช้งานที่มากขึ้น
-throughput สูง , เน้นฮาร์ดแวร์ที่ดีที่สุด และการใช้งาน CPU
-การประมวลผลเหตุการณ์ที่ซับซ้อน
-การใช้งานกับ Internet of Things (IoT), device-to-device communication
-การใช้งานมือถือ ที่มีฐานผู้ใช้ขนาดใหญ่ระดับโลกพร้อมกันกับการใช้งานที่มากขึ้น
Reactive ไม่จำเป็นต้องเป็นยารักษาระบบแบบครอบจักรวาล
-Reactive ใช้หลัก Asynchronous (เหตุการณ์ที่ขับเคลื่อนด้วยรหัสอยู่เสมอ)
-เน้นขยายขีดความสามารถของฮาร์ดแวร์ และการสร้างคำตอบ เพื่อบำรุงรักษาระบบมากขึ้น
JMS and Message Driven Beans
JMS หนึ่งของ API ที่เก่าแก่ที่สุดใน Java EE สอดคล้องอย่างยิ่งกับหลัก Reactive คือ
-Message oriented middleware
-Message/event driven, asynchronous, loosely coupled, reliable, transactional, secure, durable, fault tolerant, error tolerant, clustered, monitored, administered
-Queues (point-to-point) และ topics (publish-subscribe)
-Data (body) และ metadata (properties filterable โดย selectors)
Message Driven Beans หลักสำหรับการจัดการข้อความ JMS ใน Java EE
-เพียงแค่สร้าง POJOs ด้วย annotations (metadata)
-Secure, transactional, thread-safe, pooled, reliable, monitored, load-balanced, fault-tolerant, error-tolerant
JMS Send
@Inject JMSContext jmsContext;
@Resource(lookup = "jms/HandlingEventRegistrationAttemptQueue")
Destination handlingEventQueue;
...
public void receivedHandlingEventRegistrationAttempt(HandlingEventRegistrationAttempt attempt)
{
...
jmsContext.createProducer()
.setDeliveryMode(DeliveryMode.PERSISTENT) // The default :-)
.setPriority(LOW_PRIORITY)
.setDisableMessageID(true)
.setDisableMessageTimestamp(true)
.setStringProperty("source", source)
.send(handlingEventQueue, attempt);
}
JMS MDB
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/HandlingEventRegistrationAttemptQueue"),
@ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "source = 'mobile'")})
public class HandlingEventRegistrationAttemptConsumer implements MessageListener {
...
public void onMessage(Message message) {
...
HandlingEventRegistrationAttempt attempt = message.getBody(HandlingEventRegistrationAttempt.class);
...
}
}
Even กับ JMS 2.1?
@ApplicationScoped
@MaxConcurrency(10)
public class HandlingEventRegistrationAttemptConsumer {
@JmsListener(
destinationLookup="jms/HandlingEventRegistrationAttemptQueue", selector="source = 'mobile'", batchSize=10, retry=5, retryDelay=7000, orderBy=TIMESTAMP)
public void onEventRegistrationAttempt(HandlingEventRegistrationAttempt... attempts) {
...
}
}
Asynchronous Session Beans
Dead simple asynchrony ที่ระดับ component
-เพียงใส่ annotation ประกอบใน POJO
-ประเภทผลตอบแทนที่เรียบง่าย : void (fire-and-forget) หรือ Future<V> (Client ประมวลผลไม่ตรงกัน)
Reactive ตอบสนองความต้องการที่มีมากขึ้นตลอด หรือการตอบสนองมีมากขึ้น
-การทำธุรกรรม, ความปลอดภัย, การตรวจสอบ, thread-safe, การสำรอง
-Not loosely coupled, persistent, fault tolerant หรือ error tolerant (client ต้องชัดเจนในการจัดการข้อผิดพลาด)
Asynchronous Session Bean
@Stateless
public class ReportGeneratorService {
@Asynchronous
public Future<Report> generateReport(ReportParameters params){
try{
Report report = renderReport(params);
return new AsyncResult(report);
} catch(ReportGenerationException e) {
return new AsyncResult(new ErrorReport(e));
}
@Asynchronous
public void processPayment(Payment payment){
// CPU/IO heavy tasks to process a payment
}
Asynchronous Session Bean Client
@Inject ReportGeneratorService reportGeneratorService;
...
Future<Report> future = reportGeneratorService.generateReport(parameters);
...
if (future.isDone()) {
Report report = future.get();
...
}
...
future.cancel(true);
@Asynchronous + CompletableFuture?
@Asynchronous
public CompletableFuture<Confirmation> processPayment(Order order) {
...
Confirmation status = ...;
return
CompletableFuture<Confirmation>.completedFuture(status);
}
paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));
CDI Events/Observers
-เน้นกะทัดรัด, ง่าย, สง่า, เหตุการณ์ชนิดปลอดภัย
-เป็นหลักการสังเกตการณ์อย่างเป็นทางการผ่านทาง DI framework กับคำอธิบายประกอบ (annotations)
-นำเสนอโซลูชั่นที่ดีในการ loose-coupling และ ชนิด type-safe filtering/การผูกมัด รวมทั้ง asynchrony แบบ one-to-one หรือ one-to-many
CDI Events
@Inject @CargoInspected Event<Cargo> cargoInspected;
...
public void inspectCargo(TrackingId trackingId) {
...
cargoInspected.fire(cargo);
}
public void onCargoInspected(
@Observes @CargoInspected Cargo cargo) {
@Qualifier
@Retention(RUNTIME) @Target({FIELD, PARAMETER})
public @interface CargoInspected {}
Asynchronous CDI Events?
@Inject @CargoInspected Event<Cargo> cargoInspected;
...
public void inspectCargo(TrackingId trackingId) {
...
cargoInspected.fireAsync(cargo);
}
public void onCargoInspected(@Observes(async=true) @CargoInspected Cargo cargo) {
Asynchronous Servlets and NIO
-เพิ่มขนาดการใช้งาน Asynchronous Servlets
-Decouple connection จากการ request thread
-Return request thread กลับไปที่ pool
-การควบคุม IO/CPU เพื่อควบคุมการทำงานหนักในฝั่ง backend ที่แยกต่างหาก
-การปิดการเชื่อมต่อแคชเมื่อดำเนินการเสร็จ
NIO จะ removes possible thread blocks ในช่วงการ read/write ที่ช้า
-จะได้รับแจ้งเมื่อช่อง IO พร้อม
-โดยเฉพาะการ read/write เมื่อ IO channel พร้อม
-จำเป็นต้องเห็นการทำงานขณะ Servlet IO ทำงานหนัก มิฉะนั้นการแก้ปัญหาที่ซับซ้อนจะยากมาก
Asynchronous Servlet
@WebServlet(urlPatterns={"/report"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
public void doGet(HttpServletRequest request,
HttpServletResponse response) {
...
final AsyncContext asyncContext = request.startAsync();
asyncContext.start(() -> {
ReportParameters parameters = parseReportParameters(asyncContext.getRequest());
Report report = generateReport(parameters);
printReport(report, asyncContext.getResponse());
asyncContext.complete();
});
}
}
Asynchronous Servlet NIO (Output Stream)
private void printReport(Report report,AsyncContext context) {
ServletOutputStream output = context.getResponse().getOutputStream();
WriteListener writeListener = new ReportWriteListener(output, report, context);
output.setWriteListener(writeListener);
}
Asynchronous Servlet NIO (Write Listener)
class ReportWriteListener implements WriteListener {
private ServletOutputStream output = null;
private InputStream input = null;
private AsyncContext context = null;
ReportWriteListener(ServletOutputStream output, Report report, AsyncContext context) {
this.output = output;
this.input = report.asPdfStream();
this.context = context;
}
...
public void onWritePossible() throws IOException {
byte[] chunk = new byte[256];
int read = 0;
while (output.isReady() && (read = input.read(chunk)) != -1)
output.write(chunk, 0, read);
if (read == -1)
context.complete();
}
public void onError(Throwable t) {
context.complete();
t.printStackTrace();
}
}
Asynchronous JAX-RS
-ความสามารถของ Asynchronous ถูกเพิ่มเข้ามาเพื่อ JAX-RS 2 / Java EE 7 ทั้งบน server และ client side
-ความสามารถ API ในการ async แบบสมมาตร จะสนับสนุนทั้ง Futures และการ callbacks
Asynchronous JAX-RS Resource
@Stateless
@Path("/reports")
public class ReportsResource {
...
@Path("{id}")
@GET
@Produces({"application/pdf"})
@Asynchronous
public void generateReport(
@PathParam("id") Long id,
@Suspended AsyncResponse response) {
ResponseBuilder builder = Response.ok(renderReport(id));
builder.header("Content-Disposition","attachment; filename=report.pdf");
response.resume(builder.build());
}
}
Asynchronous JAX-RS Client
WebTarget target = client.target("http://.../balance")...
Future<Double> future = target.request().async().get(Double.class));
...
Double balance = future.get();
WebTarget target = client.target("http://.../balance")...
target.request().async().get(
new InvocationCallback<Double>() {
public void complete(Double balance) {
// Process balance
}
public void failed(InvocationException e) {
// Process error
}
});
Asynchrony/NIO in WebSocket
-มีการทำงาน (Endpoint) โดยปลายทาง WebSocket ไม่ปิดกั้น
-ไม่มีการเชื่อมต่อในครั้งแรก ทั้ง server และ client side
-การเขียน / การส่งข้อมูล สามารถทำแบบ asynchronously เพื่อเพิ่มวิธีส่งผ่านข้อมูลที่ดีกว่า
-API สมมาตรมาก สำหรับวิธี sync และแบบ async
-สนับสนุน Futures callbacks
Asynchronous Remote WebSocket Endpoint
@ServerEndpoint(value = "/chat"...)
@Singleton
public class ChatServer {
private Set<Session> peers = new HashSet<>();
@OnOpen
public void onOpen(Session peer) {
peers.add(peer);
}
@OnClose
public void onClose(Session peer) {
peers.remove(peer);
}
@OnMessage
public void onMessage(ChatMessage message) {
for (Session peer : peers) {
...peer.getAsyncRemote().sendObject(message)...
}
}
}
Future vs Callback
Future<Void> future = peer.getAsyncRemote().sendObject(message);
peer.getAsyncRemote().sendObject(message,
(SendResult result) -> {
...
if (!result.isOK()) {
...result.getException()...
}
...
});
Java EE Concurrency Utilities
-ช่วยให้การในระดับต่ำกว่าความสามารถในการ Threading และการ asynchronous ใน Java EE เรื่องความปลอดภัย และความน่าเชื่อถือ
-สร้างรหัสพิเศษมาก และปริมาณงานสามารถกำหนดเอง
การขยายขนาดของ Java SE Concurrency Utilities
-ManagedExecutorService
-ManagedThreadFactory
Managed Executor Service
@Path("/reports")
public class ReportsResource {
@Resource ManagedExecutorService executor;
...
@Path("{id}")
@GET
@Produces({"application/pdf"})
public void generateReport(
@PathParam("id") Long id,
@Suspended AsyncResponse response) {
executor.execute(() -> {
ResponseBuilder builder = Response.ok(renderReport(id));
builder.header("Content-Disposition","attachment; filename=report.pdf");
response.resume(builder.build());
}
}
}
Executor Service API
public interface ManagedExecutorService extends ExecutorService {
public void execute(Runnable command);
public <T> Future<T> submit(Callable<T> task);
public Future<?> submit(Runnable task);
public <T> Future<T> submit(Runnable task, T result);
...
}
Java SE 8 Completable Future
-ทั้ง Futures และ callbacks มีข้อบกพร่องร้ายแรงโดยเฉพาะอย่างยิ่ง Reactive code
-Java SE 8 สนับสนุนการทำงานเพื่ออนาคตที่ดีขึ้นอย่างมีนัยสำคัญ สำหรับ Reactive programming
-Non-blocking, event-driven, composable และ functional (รวมทั้ง lambdas)
-ง่ายต่อการรวมกับ Java EE 7 (Java EE และ Java SE)
The Problem with Futures (and Callbacks)
Person p = ... Future<Assets> f1 = executor.submit(() -> getAssets(p)); Future<Liabilities> f2 = executor.submit(() -> getLiabilities(p)); Future<Credit> f3 = executor.submit(() -> calculateCreditScore(f1.get(), f2.get())); // The unrelated calls below are now blocked for no reason. Future<History> f4 = executor.submit(() -> getHealthHistory(p)); Future<Health> f5 = executor.submit(() -> calculateHeathScore(f4.get())); // Unrelated paths join below. Future<Coverage> f6 = executor.submit(() -> underwrite(f3.get(), f5.get()));
ตัวอย่างการใช้งาน : https://github.com/m-reza-rahman/reactive_javaee/blob/master/CallbackHell.java
CompletableFuture Basics
@Resource ManagedExecutorService executor;
...
public CompletableFuture<Confirmation> processPayment(Order order) {
CompletableFuture<Confirmation> future = new CompletableFuture<>();
executor.execute(() -> {
Confirmation status = ...
future.complete(status);
});
return future;
}
paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));
Functional Reactive to the Rescue?
CompletableFuture<Assets> getAssets = CompletableFuture.supplyAsync(() -> getAssets(person), executor); CompletableFuture<Liabilities> getLiabilities = CompletableFuture.supplyAsync(() -> getLiabilities(person), executor); CompletableFuture<Credit> calculateCreditScore = getAssets.thenCombineAsync(getLiabilities,(assets, liabilities) -> calculateCreditScore(assets, liabilities), executor); CompletableFuture<Health> calculateHeathScore = CompletableFuture.supplyAsync(() -> getHealthHistory(person), executor).thenApplyAsync(history -> calculateHeathScore(history), executor); Coverage coverage = calculateCreditScore.thenCombineAsync(calculateHeathScore,(credit, health) -> underwrite(credit, health), executor).join();
More Possibilities…
Reactive JPA?
-Async/NIO สนับสนุนในการอ้างอิง database driver/JDBC
-specialized thread pool
CompletableFuture<List<Country>> countries = em.createQuery("SELECT c FROM Country c", Country.class).async().getResultList();
Reactive MVC?
-คล้ายกับรูปแบบพื้นฐานใน JAX-RS/Servlet
-EJB async เป็นอีกรูปแบบที่เป็นไปได้
-แนวคิด Reactive JSF ทำได้ยากลำบาก
Summary
-Reactive programming มีเทคนิคดีขึ้น และอาจมีความสำคัญมากขึ้นในอนาคตอันใกล้
-Java EE มีการสนับสนุนสำหรับ Reactive model
-Reactive จะดีมากยิ่งขึ้นด้วย Java EE 8
-Java SE 8 ช่วยให้การเขียนโปรแกรมง่ายขึ้น
-ต้องระวังไว้ว่า การเขียนโปรแกรมแบบ Reactive เป็นวิธีที่ง่ายต่อการใช้งาน
Resources
-Java EE Tutorials
http://docs.oracle.com/javaee/7/tutorial/doc/home.htm
-Java SE Tutorials
http://docs.oracle.com/javase/tutorial/
-Digging Deeper
http://docs.oracle.com/javaee/7/firstcup/doc/home.htm
https://glassfish.java.net/hol/
http://cargotracker.java.net
-Java EE Transparent Expert Groups
http://javaee-spec.java.net
-Java EE Reference Implementation
http://glassfish.org
-The Aquarium
http://blogs.oracle.com/theaquarium
Reference : เอกสาร Reactive Java EE - Let Me Count the Ways! โดย Reza Rahman (Java EE/GlassFish Evangelist) จาก Oracle ในงาน JavaOne

ไม่มีความคิดเห็น :
แสดงความคิดเห็น