-ความจริงของ Reactive คืออะไร?
-สิ่งที่น่าสนใจของ Reactive ใน Java EE
-สิ่งที่ Reactive ทำได้ใน Java SE 8?
Reactive: What’s in a Name?
“Reactive” คือ การแก้อุปสรรค์ใหญ่ เพื่อนำไปใช้งานในวงกว้างของนักพัฒนา
หลักความเสียงร่วมกัน จากความกังวลด้านการตลาด มีปัจจัย ดังนี้
-เหตุการณ์ที่ขับเคลื่อน (Event driven)
-ความไม่ตรงกัน (Asynchronous)
-การไม่ปิดกั้น (Non-blocking)
-Message driven
*** บางคนกังวลมากเกินไปเกี่ยวกับการใช้ Reactive ในเรื่อง Responsive, resilient, elastic, adaptive, fault-tolerant, scalable และอื่นๆ จากหลักการใช้งาน
มีอะไรที่เป็นเรื่องใหญ่ใน Reactive?
Reactive ได้เสนอเทคนิควิศวกรรมซอฟแวร์ที่สำคัญ คือ
-การสร้างประสบการณ์การใช้งาน และการตอบสนองการใช้งานที่มากขึ้น
-throughput สูง , เน้นฮาร์ดแวร์ที่ดีที่สุด และการใช้งาน CPU
-การประมวลผลเหตุการณ์ที่ซับซ้อน
-การใช้งานกับ Internet of Things (IoT), device-to-device communication
-การใช้งานมือถือ ที่มีฐานผู้ใช้ขนาดใหญ่ระดับโลกพร้อมกันกับการใช้งานที่มากขึ้น
Reactive ไม่จำเป็นต้องเป็นยารักษาระบบแบบครอบจักรวาล
-Reactive ใช้หลัก Asynchronous (เหตุการณ์ที่ขับเคลื่อนด้วยรหัสอยู่เสมอ)
-เน้นขยายขีดความสามารถของฮาร์ดแวร์ และการสร้างคำตอบ เพื่อบำรุงรักษาระบบมากขึ้น
JMS and Message Driven Beans
JMS หนึ่งของ API ที่เก่าแก่ที่สุดใน Java EE สอดคล้องอย่างยิ่งกับหลัก Reactive คือ
-Message oriented middleware
-Message/event driven, asynchronous, loosely coupled, reliable, transactional, secure, durable, fault tolerant, error tolerant, clustered, monitored, administered
-Queues (point-to-point) และ topics (publish-subscribe)
-Data (body) และ metadata (properties filterable โดย selectors)
Message Driven Beans หลักสำหรับการจัดการข้อความ JMS ใน Java EE
-เพียงแค่สร้าง POJOs ด้วย annotations (metadata)
-Secure, transactional, thread-safe, pooled, reliable, monitored, load-balanced, fault-tolerant, error-tolerant
JMS Send
@Inject JMSContext jmsContext;
@Resource(lookup = "jms/HandlingEventRegistrationAttemptQueue")
Destination handlingEventQueue;
...
public void receivedHandlingEventRegistrationAttempt(HandlingEventRegistrationAttempt attempt) 
{
  ...
  jmsContext.createProducer()
      .setDeliveryMode(DeliveryMode.PERSISTENT) // The default :-)
      .setPriority(LOW_PRIORITY)
      .setDisableMessageID(true)
      .setDisableMessageTimestamp(true)
      .setStringProperty("source", source)
      .send(handlingEventQueue, attempt);
}
JMS MDB
@MessageDriven(activationConfig = {
  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/HandlingEventRegistrationAttemptQueue"),
  @ActivationConfigProperty(propertyName = "messageSelector",  propertyValue = "source = 'mobile'")})
  public class HandlingEventRegistrationAttemptConsumer implements MessageListener {
  ...
  public void onMessage(Message message) {
     ...
     HandlingEventRegistrationAttempt attempt = message.getBody(HandlingEventRegistrationAttempt.class);
     ...
  }
}
Even กับ JMS 2.1?
@ApplicationScoped
@MaxConcurrency(10)
public class HandlingEventRegistrationAttemptConsumer {
  @JmsListener(
destinationLookup="jms/HandlingEventRegistrationAttemptQueue", selector="source = 'mobile'", batchSize=10, retry=5, retryDelay=7000, orderBy=TIMESTAMP)
public void onEventRegistrationAttempt(HandlingEventRegistrationAttempt... attempts) {
    ...
  }
}
Asynchronous Session Beans
Dead simple asynchrony ที่ระดับ component
-เพียงใส่ annotation ประกอบใน POJO
-ประเภทผลตอบแทนที่เรียบง่าย : void (fire-and-forget) หรือ Future<V> (Client ประมวลผลไม่ตรงกัน)
Reactive ตอบสนองความต้องการที่มีมากขึ้นตลอด หรือการตอบสนองมีมากขึ้น
-การทำธุรกรรม, ความปลอดภัย, การตรวจสอบ, thread-safe, การสำรอง
-Not loosely coupled, persistent, fault tolerant หรือ error tolerant (client ต้องชัดเจนในการจัดการข้อผิดพลาด)
Asynchronous Session Bean
@Stateless
public class ReportGeneratorService {
  @Asynchronous
  public Future<Report> generateReport(ReportParameters params){
    try{
    Report report = renderReport(params);
    return new AsyncResult(report);
    } catch(ReportGenerationException e) {
      return new AsyncResult(new ErrorReport(e));       
    }
@Asynchronous
public void processPayment(Payment payment){
  // CPU/IO heavy tasks to process a payment
}
Asynchronous Session Bean Client
@Inject ReportGeneratorService reportGeneratorService;
...
Future<Report> future = reportGeneratorService.generateReport(parameters);
...
if (future.isDone()) {
    Report report = future.get();
    ...
}
...
future.cancel(true);
@Asynchronous + CompletableFuture?
@Asynchronous
public CompletableFuture<Confirmation> processPayment(Order order) {
  ...
  Confirmation status = ...;
  return
  CompletableFuture<Confirmation>.completedFuture(status);
}
paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));
CDI Events/Observers
-เน้นกะทัดรัด, ง่าย, สง่า, เหตุการณ์ชนิดปลอดภัย
-เป็นหลักการสังเกตการณ์อย่างเป็นทางการผ่านทาง DI framework กับคำอธิบายประกอบ (annotations)
-นำเสนอโซลูชั่นที่ดีในการ loose-coupling และ ชนิด type-safe filtering/การผูกมัด รวมทั้ง asynchrony แบบ one-to-one หรือ one-to-many
CDI Events
@Inject @CargoInspected Event<Cargo> cargoInspected;
...
public void inspectCargo(TrackingId trackingId) {
  ...
  cargoInspected.fire(cargo);
}
public void onCargoInspected(
    @Observes @CargoInspected Cargo cargo) {
@Qualifier
@Retention(RUNTIME) @Target({FIELD, PARAMETER})
public @interface CargoInspected {}
Asynchronous CDI Events?
@Inject @CargoInspected Event<Cargo> cargoInspected;
...
public void inspectCargo(TrackingId trackingId) {
  ...
  cargoInspected.fireAsync(cargo);
}
public void onCargoInspected(@Observes(async=true) @CargoInspected Cargo cargo) {
Asynchronous Servlets and NIO
-เพิ่มขนาดการใช้งาน Asynchronous Servlets
-Decouple connection จากการ request thread
-Return request thread กลับไปที่ pool
-การควบคุม IO/CPU เพื่อควบคุมการทำงานหนักในฝั่ง backend ที่แยกต่างหาก
-การปิดการเชื่อมต่อแคชเมื่อดำเนินการเสร็จ
NIO จะ removes possible thread blocks ในช่วงการ read/write ที่ช้า
-จะได้รับแจ้งเมื่อช่อง IO พร้อม
-โดยเฉพาะการ read/write เมื่อ IO channel พร้อม
-จำเป็นต้องเห็นการทำงานขณะ Servlet IO ทำงานหนัก มิฉะนั้นการแก้ปัญหาที่ซับซ้อนจะยากมาก
Asynchronous Servlet
@WebServlet(urlPatterns={"/report"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
  public void doGet(HttpServletRequest request,
      HttpServletResponse response) {
    ...
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.start(() -> {
      ReportParameters parameters =  parseReportParameters(asyncContext.getRequest());
      Report report =  generateReport(parameters);
      printReport(report, asyncContext.getResponse());
      asyncContext.complete();
    });
  }
}
Asynchronous Servlet NIO (Output Stream)
private void printReport(Report report,AsyncContext context) {
  ServletOutputStream output = context.getResponse().getOutputStream();
  WriteListener writeListener = new ReportWriteListener(output, report, context);
  output.setWriteListener(writeListener);
}
Asynchronous Servlet NIO (Write Listener)
class ReportWriteListener implements WriteListener {
  private ServletOutputStream output = null;
  private InputStream input = null;
  private AsyncContext context = null;
  ReportWriteListener(ServletOutputStream output, Report report, AsyncContext context) {
    this.output = output;
    this.input = report.asPdfStream();
    this.context = context;
  }
  ...
  public void onWritePossible() throws IOException {
    byte[] chunk = new byte[256];
    int read = 0;
    while (output.isReady() && (read = input.read(chunk)) != -1)
      output.write(chunk, 0, read);
  
    if (read == -1)
      context.complete();
  }
  public void onError(Throwable t) {
    context.complete();
    t.printStackTrace();
  }
}
Asynchronous JAX-RS
-ความสามารถของ Asynchronous ถูกเพิ่มเข้ามาเพื่อ JAX-RS 2 / Java EE 7 ทั้งบน server และ client side
-ความสามารถ API ในการ async แบบสมมาตร จะสนับสนุนทั้ง Futures และการ callbacks
Asynchronous JAX-RS Resource
@Stateless
@Path("/reports")
public class ReportsResource {
  ...
  @Path("{id}")
  @GET
  @Produces({"application/pdf"})
  @Asynchronous
  public void generateReport(
      @PathParam("id") Long id,
      @Suspended AsyncResponse response) {
    ResponseBuilder builder = Response.ok(renderReport(id));
builder.header("Content-Disposition","attachment; filename=report.pdf");
response.resume(builder.build());
  }
}
Asynchronous JAX-RS Client
WebTarget target = client.target("http://.../balance")...
Future<Double> future = target.request().async().get(Double.class));
...
Double balance = future.get();
WebTarget target = client.target("http://.../balance")...
target.request().async().get(
    new InvocationCallback<Double>() {
        public void complete(Double balance) {
          // Process balance
        }
        public void failed(InvocationException e) {
          // Process error
        }
    });
Asynchrony/NIO in WebSocket
-มีการทำงาน (Endpoint) โดยปลายทาง WebSocket ไม่ปิดกั้น
-ไม่มีการเชื่อมต่อในครั้งแรก ทั้ง server และ client side
-การเขียน / การส่งข้อมูล สามารถทำแบบ asynchronously เพื่อเพิ่มวิธีส่งผ่านข้อมูลที่ดีกว่า
-API สมมาตรมาก สำหรับวิธี sync และแบบ async
-สนับสนุน Futures callbacks
Asynchronous Remote WebSocket Endpoint
@ServerEndpoint(value = "/chat"...)
@Singleton
public class ChatServer {
  private Set<Session> peers = new HashSet<>();
  @OnOpen
  public void onOpen(Session peer) {
    peers.add(peer);
  }
  @OnClose
  public void onClose(Session peer) {
    peers.remove(peer);
  }
  @OnMessage
  public void onMessage(ChatMessage message) {
    for (Session peer : peers) {
      ...peer.getAsyncRemote().sendObject(message)...
    }
  }
}
Future vs Callback
Future<Void> future = peer.getAsyncRemote().sendObject(message);
peer.getAsyncRemote().sendObject(message,
    (SendResult result) -> {
      ...
      if (!result.isOK()) {
        ...result.getException()...
      }
      ...
    });
Java EE Concurrency Utilities
-ช่วยให้การในระดับต่ำกว่าความสามารถในการ Threading และการ asynchronous ใน Java EE เรื่องความปลอดภัย และความน่าเชื่อถือ
-สร้างรหัสพิเศษมาก และปริมาณงานสามารถกำหนดเอง
การขยายขนาดของ Java SE Concurrency Utilities
-ManagedExecutorService
-ManagedThreadFactory
Managed Executor Service
@Path("/reports")
public class ReportsResource {
  @Resource ManagedExecutorService executor;
  ...
  @Path("{id}")
  @GET
  @Produces({"application/pdf"})
  public void generateReport(
      @PathParam("id") Long id,
      @Suspended AsyncResponse response) {
    executor.execute(() -> {
      ResponseBuilder builder = Response.ok(renderReport(id));
builder.header("Content-Disposition","attachment; filename=report.pdf");
    response.resume(builder.build());
    }
  }
}
Executor Service API
public interface ManagedExecutorService extends ExecutorService {
  public void execute(Runnable command);
  public <T> Future<T> submit(Callable<T> task);
  public Future<?> submit(Runnable task);
  public <T> Future<T> submit(Runnable task, T result);
  ...
}
Java SE 8 Completable Future
-ทั้ง Futures และ callbacks มีข้อบกพร่องร้ายแรงโดยเฉพาะอย่างยิ่ง Reactive code
-Java SE 8 สนับสนุนการทำงานเพื่ออนาคตที่ดีขึ้นอย่างมีนัยสำคัญ สำหรับ Reactive programming
-Non-blocking, event-driven, composable และ functional (รวมทั้ง lambdas)
-ง่ายต่อการรวมกับ Java EE 7 (Java EE และ Java SE)
The Problem with Futures (and Callbacks)
Person p = ... Future<Assets> f1 = executor.submit(() -> getAssets(p)); Future<Liabilities> f2 = executor.submit(() -> getLiabilities(p)); Future<Credit> f3 = executor.submit(() -> calculateCreditScore(f1.get(), f2.get())); // The unrelated calls below are now blocked for no reason. Future<History> f4 = executor.submit(() -> getHealthHistory(p)); Future<Health> f5 = executor.submit(() -> calculateHeathScore(f4.get())); // Unrelated paths join below. Future<Coverage> f6 = executor.submit(() -> underwrite(f3.get(), f5.get()));
ตัวอย่างการใช้งาน : https://github.com/m-reza-rahman/reactive_javaee/blob/master/CallbackHell.java
CompletableFuture Basics
@Resource ManagedExecutorService executor;
...
public CompletableFuture<Confirmation> processPayment(Order order) {
  CompletableFuture<Confirmation> future = new CompletableFuture<>();
  executor.execute(() -> {
    Confirmation status = ...
    future.complete(status);
  });
  return future;
}
paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));
Functional Reactive to the Rescue?
CompletableFuture<Assets> getAssets = CompletableFuture.supplyAsync(() -> getAssets(person), executor); CompletableFuture<Liabilities> getLiabilities = CompletableFuture.supplyAsync(() -> getLiabilities(person), executor); CompletableFuture<Credit> calculateCreditScore = getAssets.thenCombineAsync(getLiabilities,(assets, liabilities) -> calculateCreditScore(assets, liabilities), executor); CompletableFuture<Health> calculateHeathScore = CompletableFuture.supplyAsync(() -> getHealthHistory(person), executor).thenApplyAsync(history -> calculateHeathScore(history), executor); Coverage coverage = calculateCreditScore.thenCombineAsync(calculateHeathScore,(credit, health) -> underwrite(credit, health), executor).join();
More Possibilities…
Reactive JPA?
-Async/NIO สนับสนุนในการอ้างอิง database driver/JDBC
-specialized thread pool
CompletableFuture<List<Country>> countries = em.createQuery("SELECT c FROM Country c", Country.class).async().getResultList();
Reactive MVC?
-คล้ายกับรูปแบบพื้นฐานใน JAX-RS/Servlet
-EJB async เป็นอีกรูปแบบที่เป็นไปได้
-แนวคิด Reactive JSF ทำได้ยากลำบาก
Summary
-Reactive programming มีเทคนิคดีขึ้น และอาจมีความสำคัญมากขึ้นในอนาคตอันใกล้
-Java EE มีการสนับสนุนสำหรับ Reactive model
-Reactive จะดีมากยิ่งขึ้นด้วย Java EE 8
-Java SE 8 ช่วยให้การเขียนโปรแกรมง่ายขึ้น
-ต้องระวังไว้ว่า การเขียนโปรแกรมแบบ Reactive เป็นวิธีที่ง่ายต่อการใช้งาน
Resources
-Java EE Tutorials
http://docs.oracle.com/javaee/7/tutorial/doc/home.htm
-Java SE Tutorials
http://docs.oracle.com/javase/tutorial/
-Digging Deeper
http://docs.oracle.com/javaee/7/firstcup/doc/home.htm
https://glassfish.java.net/hol/
http://cargotracker.java.net
-Java EE Transparent Expert Groups
http://javaee-spec.java.net
-Java EE Reference Implementation
http://glassfish.org
-The Aquarium
http://blogs.oracle.com/theaquarium
Reference : เอกสาร Reactive Java EE - Let Me Count the Ways! โดย Reza Rahman (Java EE/GlassFish Evangelist) จาก Oracle ในงาน JavaOne

ไม่มีความคิดเห็น :
แสดงความคิดเห็น