-ความจริงของ Reactive คืออะไร?
-สิ่งที่น่าสนใจของ Reactive ใน Java EE
-สิ่งที่ Reactive ทำได้ใน Java SE 8?
Reactive: What’s in a Name?
“Reactive” คือ การแก้อุปสรรค์ใหญ่ เพื่อนำไปใช้งานในวงกว้างของนักพัฒนา
หลักความเสียงร่วมกัน จากความกังวลด้านการตลาด มีปัจจัย ดังนี้
-เหตุการณ์ที่ขับเคลื่อน (Event driven)
-ความไม่ตรงกัน (Asynchronous)
-การไม่ปิดกั้น (Non-blocking)
-Message driven
*** บางคนกังวลมากเกินไปเกี่ยวกับการใช้ Reactive ในเรื่อง Responsive, resilient, elastic, adaptive, fault-tolerant, scalable และอื่นๆ จากหลักการใช้งาน
มีอะไรที่เป็นเรื่องใหญ่ใน Reactive?
Reactive ได้เสนอเทคนิควิศวกรรมซอฟแวร์ที่สำคัญ คือ
-การสร้างประสบการณ์การใช้งาน และการตอบสนองการใช้งานที่มากขึ้น
-throughput สูง , เน้นฮาร์ดแวร์ที่ดีที่สุด และการใช้งาน CPU
-การประมวลผลเหตุการณ์ที่ซับซ้อน
-การใช้งานกับ Internet of Things (IoT), device-to-device communication
-การใช้งานมือถือ ที่มีฐานผู้ใช้ขนาดใหญ่ระดับโลกพร้อมกันกับการใช้งานที่มากขึ้น
Reactive ไม่จำเป็นต้องเป็นยารักษาระบบแบบครอบจักรวาล
-Reactive ใช้หลัก Asynchronous (เหตุการณ์ที่ขับเคลื่อนด้วยรหัสอยู่เสมอ)
-เน้นขยายขีดความสามารถของฮาร์ดแวร์ และการสร้างคำตอบ เพื่อบำรุงรักษาระบบมากขึ้น
JMS and Message Driven Beans
JMS หนึ่งของ API ที่เก่าแก่ที่สุดใน Java EE สอดคล้องอย่างยิ่งกับหลัก Reactive คือ
-Message oriented middleware
-Message/event driven, asynchronous, loosely coupled, reliable, transactional, secure, durable, fault tolerant, error tolerant, clustered, monitored, administered
-Queues (point-to-point) และ topics (publish-subscribe)
-Data (body) และ metadata (properties filterable โดย selectors)
Message Driven Beans หลักสำหรับการจัดการข้อความ JMS ใน Java EE
-เพียงแค่สร้าง POJOs ด้วย annotations (metadata)
-Secure, transactional, thread-safe, pooled, reliable, monitored, load-balanced, fault-tolerant, error-tolerant
JMS Send
@Inject JMSContext jmsContext; @Resource(lookup = "jms/HandlingEventRegistrationAttemptQueue") Destination handlingEventQueue; ... public void receivedHandlingEventRegistrationAttempt(HandlingEventRegistrationAttempt attempt) { ... jmsContext.createProducer() .setDeliveryMode(DeliveryMode.PERSISTENT) // The default :-) .setPriority(LOW_PRIORITY) .setDisableMessageID(true) .setDisableMessageTimestamp(true) .setStringProperty("source", source) .send(handlingEventQueue, attempt); }
JMS MDB
@MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/HandlingEventRegistrationAttemptQueue"), @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "source = 'mobile'")}) public class HandlingEventRegistrationAttemptConsumer implements MessageListener { ... public void onMessage(Message message) { ... HandlingEventRegistrationAttempt attempt = message.getBody(HandlingEventRegistrationAttempt.class); ... } }
Even กับ JMS 2.1?
@ApplicationScoped @MaxConcurrency(10) public class HandlingEventRegistrationAttemptConsumer { @JmsListener( destinationLookup="jms/HandlingEventRegistrationAttemptQueue", selector="source = 'mobile'", batchSize=10, retry=5, retryDelay=7000, orderBy=TIMESTAMP) public void onEventRegistrationAttempt(HandlingEventRegistrationAttempt... attempts) { ... } }
Asynchronous Session Beans
Dead simple asynchrony ที่ระดับ component
-เพียงใส่ annotation ประกอบใน POJO
-ประเภทผลตอบแทนที่เรียบง่าย : void (fire-and-forget) หรือ Future<V> (Client ประมวลผลไม่ตรงกัน)
Reactive ตอบสนองความต้องการที่มีมากขึ้นตลอด หรือการตอบสนองมีมากขึ้น
-การทำธุรกรรม, ความปลอดภัย, การตรวจสอบ, thread-safe, การสำรอง
-Not loosely coupled, persistent, fault tolerant หรือ error tolerant (client ต้องชัดเจนในการจัดการข้อผิดพลาด)
Asynchronous Session Bean
@Stateless public class ReportGeneratorService { @Asynchronous public Future<Report> generateReport(ReportParameters params){ try{ Report report = renderReport(params); return new AsyncResult(report); } catch(ReportGenerationException e) { return new AsyncResult(new ErrorReport(e)); }
@Asynchronous public void processPayment(Payment payment){ // CPU/IO heavy tasks to process a payment }
Asynchronous Session Bean Client
@Inject ReportGeneratorService reportGeneratorService; ... Future<Report> future = reportGeneratorService.generateReport(parameters); ... if (future.isDone()) { Report report = future.get(); ... } ... future.cancel(true);
@Asynchronous + CompletableFuture?
@Asynchronous public CompletableFuture<Confirmation> processPayment(Order order) { ... Confirmation status = ...; return CompletableFuture<Confirmation>.completedFuture(status); } paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));
CDI Events/Observers
-เน้นกะทัดรัด, ง่าย, สง่า, เหตุการณ์ชนิดปลอดภัย
-เป็นหลักการสังเกตการณ์อย่างเป็นทางการผ่านทาง DI framework กับคำอธิบายประกอบ (annotations)
-นำเสนอโซลูชั่นที่ดีในการ loose-coupling และ ชนิด type-safe filtering/การผูกมัด รวมทั้ง asynchrony แบบ one-to-one หรือ one-to-many
CDI Events
@Inject @CargoInspected Event<Cargo> cargoInspected; ... public void inspectCargo(TrackingId trackingId) { ... cargoInspected.fire(cargo); } public void onCargoInspected( @Observes @CargoInspected Cargo cargo) { @Qualifier @Retention(RUNTIME) @Target({FIELD, PARAMETER}) public @interface CargoInspected {}
Asynchronous CDI Events?
@Inject @CargoInspected Event<Cargo> cargoInspected; ... public void inspectCargo(TrackingId trackingId) { ... cargoInspected.fireAsync(cargo); } public void onCargoInspected(@Observes(async=true) @CargoInspected Cargo cargo) {
Asynchronous Servlets and NIO
-เพิ่มขนาดการใช้งาน Asynchronous Servlets
-Decouple connection จากการ request thread
-Return request thread กลับไปที่ pool
-การควบคุม IO/CPU เพื่อควบคุมการทำงานหนักในฝั่ง backend ที่แยกต่างหาก
-การปิดการเชื่อมต่อแคชเมื่อดำเนินการเสร็จ
NIO จะ removes possible thread blocks ในช่วงการ read/write ที่ช้า
-จะได้รับแจ้งเมื่อช่อง IO พร้อม
-โดยเฉพาะการ read/write เมื่อ IO channel พร้อม
-จำเป็นต้องเห็นการทำงานขณะ Servlet IO ทำงานหนัก มิฉะนั้นการแก้ปัญหาที่ซับซ้อนจะยากมาก
Asynchronous Servlet
@WebServlet(urlPatterns={"/report"}, asyncSupported=true) public class AsyncServlet extends HttpServlet { public void doGet(HttpServletRequest request, HttpServletResponse response) { ... final AsyncContext asyncContext = request.startAsync(); asyncContext.start(() -> { ReportParameters parameters = parseReportParameters(asyncContext.getRequest()); Report report = generateReport(parameters); printReport(report, asyncContext.getResponse()); asyncContext.complete(); }); } }
Asynchronous Servlet NIO (Output Stream)
private void printReport(Report report,AsyncContext context) { ServletOutputStream output = context.getResponse().getOutputStream(); WriteListener writeListener = new ReportWriteListener(output, report, context); output.setWriteListener(writeListener); }
Asynchronous Servlet NIO (Write Listener)
class ReportWriteListener implements WriteListener { private ServletOutputStream output = null; private InputStream input = null; private AsyncContext context = null; ReportWriteListener(ServletOutputStream output, Report report, AsyncContext context) { this.output = output; this.input = report.asPdfStream(); this.context = context; } ... public void onWritePossible() throws IOException { byte[] chunk = new byte[256]; int read = 0; while (output.isReady() && (read = input.read(chunk)) != -1) output.write(chunk, 0, read); if (read == -1) context.complete(); } public void onError(Throwable t) { context.complete(); t.printStackTrace(); } }
Asynchronous JAX-RS
-ความสามารถของ Asynchronous ถูกเพิ่มเข้ามาเพื่อ JAX-RS 2 / Java EE 7 ทั้งบน server และ client side
-ความสามารถ API ในการ async แบบสมมาตร จะสนับสนุนทั้ง Futures และการ callbacks
Asynchronous JAX-RS Resource
@Stateless @Path("/reports") public class ReportsResource { ... @Path("{id}") @GET @Produces({"application/pdf"}) @Asynchronous public void generateReport( @PathParam("id") Long id, @Suspended AsyncResponse response) { ResponseBuilder builder = Response.ok(renderReport(id)); builder.header("Content-Disposition","attachment; filename=report.pdf"); response.resume(builder.build()); } }
Asynchronous JAX-RS Client
WebTarget target = client.target("http://.../balance")... Future<Double> future = target.request().async().get(Double.class)); ... Double balance = future.get(); WebTarget target = client.target("http://.../balance")... target.request().async().get( new InvocationCallback<Double>() { public void complete(Double balance) { // Process balance } public void failed(InvocationException e) { // Process error } });
Asynchrony/NIO in WebSocket
-มีการทำงาน (Endpoint) โดยปลายทาง WebSocket ไม่ปิดกั้น
-ไม่มีการเชื่อมต่อในครั้งแรก ทั้ง server และ client side
-การเขียน / การส่งข้อมูล สามารถทำแบบ asynchronously เพื่อเพิ่มวิธีส่งผ่านข้อมูลที่ดีกว่า
-API สมมาตรมาก สำหรับวิธี sync และแบบ async
-สนับสนุน Futures callbacks
Asynchronous Remote WebSocket Endpoint
@ServerEndpoint(value = "/chat"...) @Singleton public class ChatServer { private Set<Session> peers = new HashSet<>(); @OnOpen public void onOpen(Session peer) { peers.add(peer); } @OnClose public void onClose(Session peer) { peers.remove(peer); } @OnMessage public void onMessage(ChatMessage message) { for (Session peer : peers) { ...peer.getAsyncRemote().sendObject(message)... } } }
Future vs Callback
Future<Void> future = peer.getAsyncRemote().sendObject(message); peer.getAsyncRemote().sendObject(message, (SendResult result) -> { ... if (!result.isOK()) { ...result.getException()... } ... });
Java EE Concurrency Utilities
-ช่วยให้การในระดับต่ำกว่าความสามารถในการ Threading และการ asynchronous ใน Java EE เรื่องความปลอดภัย และความน่าเชื่อถือ
-สร้างรหัสพิเศษมาก และปริมาณงานสามารถกำหนดเอง
การขยายขนาดของ Java SE Concurrency Utilities
-ManagedExecutorService
-ManagedThreadFactory
Managed Executor Service
@Path("/reports") public class ReportsResource { @Resource ManagedExecutorService executor; ... @Path("{id}") @GET @Produces({"application/pdf"}) public void generateReport( @PathParam("id") Long id, @Suspended AsyncResponse response) { executor.execute(() -> { ResponseBuilder builder = Response.ok(renderReport(id)); builder.header("Content-Disposition","attachment; filename=report.pdf"); response.resume(builder.build()); } } }
Executor Service API
public interface ManagedExecutorService extends ExecutorService { public void execute(Runnable command); public <T> Future<T> submit(Callable<T> task); public Future<?> submit(Runnable task); public <T> Future<T> submit(Runnable task, T result); ... }
Java SE 8 Completable Future
-ทั้ง Futures และ callbacks มีข้อบกพร่องร้ายแรงโดยเฉพาะอย่างยิ่ง Reactive code
-Java SE 8 สนับสนุนการทำงานเพื่ออนาคตที่ดีขึ้นอย่างมีนัยสำคัญ สำหรับ Reactive programming
-Non-blocking, event-driven, composable และ functional (รวมทั้ง lambdas)
-ง่ายต่อการรวมกับ Java EE 7 (Java EE และ Java SE)
The Problem with Futures (and Callbacks)
Person p = ... Future<Assets> f1 = executor.submit(() -> getAssets(p)); Future<Liabilities> f2 = executor.submit(() -> getLiabilities(p)); Future<Credit> f3 = executor.submit(() -> calculateCreditScore(f1.get(), f2.get())); // The unrelated calls below are now blocked for no reason. Future<History> f4 = executor.submit(() -> getHealthHistory(p)); Future<Health> f5 = executor.submit(() -> calculateHeathScore(f4.get())); // Unrelated paths join below. Future<Coverage> f6 = executor.submit(() -> underwrite(f3.get(), f5.get()));
ตัวอย่างการใช้งาน : https://github.com/m-reza-rahman/reactive_javaee/blob/master/CallbackHell.java
CompletableFuture Basics
@Resource ManagedExecutorService executor; ... public CompletableFuture<Confirmation> processPayment(Order order) { CompletableFuture<Confirmation> future = new CompletableFuture<>(); executor.execute(() -> { Confirmation status = ... future.complete(status); }); return future; } paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));
Functional Reactive to the Rescue?
CompletableFuture<Assets> getAssets = CompletableFuture.supplyAsync(() -> getAssets(person), executor); CompletableFuture<Liabilities> getLiabilities = CompletableFuture.supplyAsync(() -> getLiabilities(person), executor); CompletableFuture<Credit> calculateCreditScore = getAssets.thenCombineAsync(getLiabilities,(assets, liabilities) -> calculateCreditScore(assets, liabilities), executor); CompletableFuture<Health> calculateHeathScore = CompletableFuture.supplyAsync(() -> getHealthHistory(person), executor).thenApplyAsync(history -> calculateHeathScore(history), executor); Coverage coverage = calculateCreditScore.thenCombineAsync(calculateHeathScore,(credit, health) -> underwrite(credit, health), executor).join();
More Possibilities…
Reactive JPA?
-Async/NIO สนับสนุนในการอ้างอิง database driver/JDBC
-specialized thread pool
CompletableFuture<List<Country>> countries = em.createQuery("SELECT c FROM Country c", Country.class).async().getResultList();
Reactive MVC?
-คล้ายกับรูปแบบพื้นฐานใน JAX-RS/Servlet
-EJB async เป็นอีกรูปแบบที่เป็นไปได้
-แนวคิด Reactive JSF ทำได้ยากลำบาก
Summary
-Reactive programming มีเทคนิคดีขึ้น และอาจมีความสำคัญมากขึ้นในอนาคตอันใกล้
-Java EE มีการสนับสนุนสำหรับ Reactive model
-Reactive จะดีมากยิ่งขึ้นด้วย Java EE 8
-Java SE 8 ช่วยให้การเขียนโปรแกรมง่ายขึ้น
-ต้องระวังไว้ว่า การเขียนโปรแกรมแบบ Reactive เป็นวิธีที่ง่ายต่อการใช้งาน
Resources
-Java EE Tutorials
http://docs.oracle.com/javaee/7/tutorial/doc/home.htm
-Java SE Tutorials
http://docs.oracle.com/javase/tutorial/
-Digging Deeper
http://docs.oracle.com/javaee/7/firstcup/doc/home.htm
https://glassfish.java.net/hol/
http://cargotracker.java.net
-Java EE Transparent Expert Groups
http://javaee-spec.java.net
-Java EE Reference Implementation
http://glassfish.org
-The Aquarium
http://blogs.oracle.com/theaquarium
Reference : เอกสาร Reactive Java EE - Let Me Count the Ways! โดย Reza Rahman (Java EE/GlassFish Evangelist) จาก Oracle ในงาน JavaOne
ไม่มีความคิดเห็น :
แสดงความคิดเห็น