วันเสาร์ที่ 7 พฤษภาคม พ.ศ. 2559

Reactive Java EE


สิ่งที่เราควรจะรู้กับ  Reactive Java ???

-ความจริงของ Reactive คืออะไร?
-สิ่งที่น่าสนใจของ Reactive ใน Java EE
-สิ่งที่ Reactive ทำได้ใน Java SE 8?


Reactive: What’s in a Name?

“Reactive” คือ การแก้อุปสรรค์ใหญ่ เพื่อนำไปใช้งานในวงกว้างของนักพัฒนา

หลักความเสียงร่วมกัน จากความกังวลด้านการตลาด มีปัจจัย ดังนี้ 
-เหตุการณ์ที่ขับเคลื่อน (Event driven)
-ความไม่ตรงกัน (Asynchronous)
-การไม่ปิดกั้น (Non-blocking)
-Message driven

*** บางคนกังวลมากเกินไปเกี่ยวกับการใช้ Reactive ในเรื่อง Responsive, resilient, elastic, adaptive, fault-tolerant, scalable และอื่นๆ จากหลักการใช้งาน

มีอะไรที่เป็นเรื่องใหญ่ใน Reactive?

Reactive ได้เสนอเทคนิควิศวกรรมซอฟแวร์ที่สำคัญ คือ
-การสร้างประสบการณ์การใช้งาน และการตอบสนองการใช้งานที่มากขึ้น
-throughput สูง , เน้นฮาร์ดแวร์ที่ดีที่สุด และการใช้งาน CPU
-การประมวลผลเหตุการณ์ที่ซับซ้อน
-การใช้งานกับ Internet of Things (IoT), device-to-device communication
-การใช้งานมือถือ ที่มีฐานผู้ใช้ขนาดใหญ่ระดับโลกพร้อมกันกับการใช้งานที่มากขึ้น

Reactive ไม่จำเป็นต้องเป็นยารักษาระบบแบบครอบจักรวาล 
-Reactive ใช้หลัก Asynchronous (เหตุการณ์ที่ขับเคลื่อนด้วยรหัสอยู่เสมอ)
-เน้นขยายขีดความสามารถของฮาร์ดแวร์ และการสร้างคำตอบ เพื่อบำรุงรักษาระบบมากขึ้น

JMS and Message Driven Beans

JMS หนึ่งของ API ที่เก่าแก่ที่สุดใน Java EE สอดคล้องอย่างยิ่งกับหลัก Reactive คือ
-Message oriented middleware
-Message/event driven, asynchronous, loosely coupled, reliable, transactional, secure, durable, fault tolerant, error tolerant, clustered, monitored, administered
-Queues (point-to-point) และ topics (publish-subscribe)
-Data (body) และ metadata (properties filterable โดย selectors)
Message Driven Beans หลักสำหรับการจัดการข้อความ JMS ใน Java EE
-เพียงแค่สร้าง POJOs ด้วย annotations (metadata)
-Secure, transactional, thread-safe, pooled, reliable, monitored, load-balanced, fault-tolerant, error-tolerant

JMS Send

@Inject JMSContext jmsContext;
@Resource(lookup = "jms/HandlingEventRegistrationAttemptQueue")
Destination handlingEventQueue;
...
public void receivedHandlingEventRegistrationAttempt(HandlingEventRegistrationAttempt attempt) 
{
  ...
  jmsContext.createProducer()
      .setDeliveryMode(DeliveryMode.PERSISTENT) // The default :-)
      .setPriority(LOW_PRIORITY)
      .setDisableMessageID(true)
      .setDisableMessageTimestamp(true)
      .setStringProperty("source", source)
      .send(handlingEventQueue, attempt);
}

JMS MDB

@MessageDriven(activationConfig = {
  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/HandlingEventRegistrationAttemptQueue"),
  @ActivationConfigProperty(propertyName = "messageSelector",  propertyValue = "source = 'mobile'")})
  public class HandlingEventRegistrationAttemptConsumer implements MessageListener {
  ...
  public void onMessage(Message message) {
     ...
     HandlingEventRegistrationAttempt attempt = message.getBody(HandlingEventRegistrationAttempt.class);
     ...
  }
}

Even กับ JMS 2.1?

@ApplicationScoped
@MaxConcurrency(10)
public class HandlingEventRegistrationAttemptConsumer {
  @JmsListener(
destinationLookup="jms/HandlingEventRegistrationAttemptQueue", selector="source = 'mobile'", batchSize=10, retry=5, retryDelay=7000, orderBy=TIMESTAMP)
public void onEventRegistrationAttempt(HandlingEventRegistrationAttempt... attempts) {
    ...
  }
}

Asynchronous Session Beans

Dead simple asynchrony ที่ระดับ component
-เพียงใส่ annotation ประกอบใน POJO
-ประเภทผลตอบแทนที่เรียบง่าย : void (fire-and-forget) หรือ Future<V> (Client ประมวลผลไม่ตรงกัน)

Reactive ตอบสนองความต้องการที่มีมากขึ้นตลอด หรือการตอบสนองมีมากขึ้น
-การทำธุรกรรม, ความปลอดภัย, การตรวจสอบ, thread-safe, การสำรอง
-Not loosely coupled, persistent, fault tolerant หรือ error tolerant (client ต้องชัดเจนในการจัดการข้อผิดพลาด)

Asynchronous Session Bean

@Stateless
public class ReportGeneratorService {
  @Asynchronous
  public Future<Report> generateReport(ReportParameters params){
    try{
    Report report = renderReport(params);
    return new AsyncResult(report);
    } catch(ReportGenerationException e) {
      return new AsyncResult(new ErrorReport(e));       
    }

@Asynchronous
public void processPayment(Payment payment){
  // CPU/IO heavy tasks to process a payment
}

Asynchronous Session Bean Client

@Inject ReportGeneratorService reportGeneratorService;
...
Future<Report> future = reportGeneratorService.generateReport(parameters);
...
if (future.isDone()) {
    Report report = future.get();
    ...
}
...
future.cancel(true);

@Asynchronous + CompletableFuture?

@Asynchronous
public CompletableFuture<Confirmation> processPayment(Order order) {
  ...
  Confirmation status = ...;
  return
  CompletableFuture<Confirmation>.completedFuture(status);
}
paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));

CDI Events/Observers

-เน้นกะทัดรัด, ง่าย, สง่า, เหตุการณ์ชนิดปลอดภัย
-เป็นหลักการสังเกตการณ์อย่างเป็นทางการผ่านทาง DI framework กับคำอธิบายประกอบ (annotations)
-นำเสนอโซลูชั่นที่ดีในการ loose-coupling และ ชนิด type-safe filtering/การผูกมัด รวมทั้ง asynchrony แบบ one-to-one หรือ one-to-many

CDI Events

@Inject @CargoInspected Event<Cargo> cargoInspected;
...
public void inspectCargo(TrackingId trackingId) {
  ...
  cargoInspected.fire(cargo);
}


public void onCargoInspected(
    @Observes @CargoInspected Cargo cargo) {


@Qualifier
@Retention(RUNTIME) @Target({FIELD, PARAMETER})
public @interface CargoInspected {}

Asynchronous CDI Events?

@Inject @CargoInspected Event<Cargo> cargoInspected;
...
public void inspectCargo(TrackingId trackingId) {
  ...
  cargoInspected.fireAsync(cargo);
}
public void onCargoInspected(@Observes(async=true) @CargoInspected Cargo cargo) {

Asynchronous Servlets and NIO

-เพิ่มขนาดการใช้งาน Asynchronous Servlets
-Decouple connection จากการ request thread
-Return request thread กลับไปที่ pool
-การควบคุม IO/CPU เพื่อควบคุมการทำงานหนักในฝั่ง backend ที่แยกต่างหาก
-การปิดการเชื่อมต่อแคชเมื่อดำเนินการเสร็จ

NIO จะ removes possible thread blocks ในช่วงการ read/write ที่ช้า
-จะได้รับแจ้งเมื่อช่อง IO พร้อม
-โดยเฉพาะการ read/write เมื่อ IO channel พร้อม
-จำเป็นต้องเห็นการทำงานขณะ Servlet IO ทำงานหนัก มิฉะนั้นการแก้ปัญหาที่ซับซ้อนจะยากมาก

Asynchronous Servlet

@WebServlet(urlPatterns={"/report"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
  public void doGet(HttpServletRequest request,
      HttpServletResponse response) {
    ...
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.start(() -> {
      ReportParameters parameters =  parseReportParameters(asyncContext.getRequest());
      Report report =  generateReport(parameters);
      printReport(report, asyncContext.getResponse());
      asyncContext.complete();
    });
  }
}

Asynchronous Servlet NIO (Output Stream)

private void printReport(Report report,AsyncContext context) {
  ServletOutputStream output = context.getResponse().getOutputStream();
  WriteListener writeListener = new ReportWriteListener(output, report, context);
  output.setWriteListener(writeListener);
}

Asynchronous Servlet NIO (Write Listener)

class ReportWriteListener implements WriteListener {
  private ServletOutputStream output = null;
  private InputStream input = null;
  private AsyncContext context = null;
  ReportWriteListener(ServletOutputStream output, Report report, AsyncContext context) {
    this.output = output;
    this.input = report.asPdfStream();
    this.context = context;
  }
  ...
  public void onWritePossible() throws IOException {
    byte[] chunk = new byte[256];
    int read = 0;
    while (output.isReady() && (read = input.read(chunk)) != -1)
      output.write(chunk, 0, read);
  
    if (read == -1)
      context.complete();
  }
  public void onError(Throwable t) {
    context.complete();
    t.printStackTrace();
  }
}

Asynchronous JAX-RS

-ความสามารถของ Asynchronous ถูกเพิ่มเข้ามาเพื่อ JAX-RS 2 / Java EE 7 ทั้งบน server และ client side
-ความสามารถ API ในการ async แบบสมมาตร จะสนับสนุนทั้ง Futures และการ callbacks

Asynchronous JAX-RS Resource

@Stateless
@Path("/reports")
public class ReportsResource {
  ...
  @Path("{id}")
  @GET
  @Produces({"application/pdf"})
  @Asynchronous
  public void generateReport(
      @PathParam("id") Long id,
      @Suspended AsyncResponse response) {
    ResponseBuilder builder = Response.ok(renderReport(id));
builder.header("Content-Disposition","attachment; filename=report.pdf");
response.resume(builder.build());
  }
}

Asynchronous JAX-RS Client

WebTarget target = client.target("http://.../balance")...
Future<Double> future = target.request().async().get(Double.class));
...
Double balance = future.get();
WebTarget target = client.target("http://.../balance")...
target.request().async().get(
    new InvocationCallback<Double>() {
        public void complete(Double balance) {
          // Process balance
        }
        public void failed(InvocationException e) {
          // Process error
        }
    });

Asynchrony/NIO in WebSocket

-มีการทำงาน (Endpoint) โดยปลายทาง WebSocket ไม่ปิดกั้น
-ไม่มีการเชื่อมต่อในครั้งแรก ทั้ง server และ client side
-การเขียน / การส่งข้อมูล สามารถทำแบบ asynchronously เพื่อเพิ่มวิธีส่งผ่านข้อมูลที่ดีกว่า
-API สมมาตรมาก สำหรับวิธี sync และแบบ async
-สนับสนุน Futures callbacks

Asynchronous Remote WebSocket Endpoint

@ServerEndpoint(value = "/chat"...)
@Singleton
public class ChatServer {
  private Set<Session> peers = new HashSet<>();
  @OnOpen
  public void onOpen(Session peer) {
    peers.add(peer);
  }
  @OnClose
  public void onClose(Session peer) {
    peers.remove(peer);
  }
  @OnMessage
  public void onMessage(ChatMessage message) {
    for (Session peer : peers) {
      ...peer.getAsyncRemote().sendObject(message)...
    }
  }
}

Future vs Callback

Future<Void> future = peer.getAsyncRemote().sendObject(message);


peer.getAsyncRemote().sendObject(message,
    (SendResult result) -> {
      ...
      if (!result.isOK()) {
        ...result.getException()...
      }
      ...
    });

Java EE Concurrency Utilities

-ช่วยให้การในระดับต่ำกว่าความสามารถในการ Threading และการ asynchronous ใน Java EE เรื่องความปลอดภัย และความน่าเชื่อถือ
-สร้างรหัสพิเศษมาก และปริมาณงานสามารถกำหนดเอง

การขยายขนาดของ Java SE Concurrency Utilities
-ManagedExecutorService
-ManagedThreadFactory

Managed Executor Service

@Path("/reports")
public class ReportsResource {
  @Resource ManagedExecutorService executor;
  ...
  @Path("{id}")
  @GET
  @Produces({"application/pdf"})
  public void generateReport(
      @PathParam("id") Long id,
      @Suspended AsyncResponse response) {
    executor.execute(() -> {
      ResponseBuilder builder = Response.ok(renderReport(id));
builder.header("Content-Disposition","attachment; filename=report.pdf");
    response.resume(builder.build());
    }
  }
}

Executor Service API

public interface ManagedExecutorService extends ExecutorService {
  public void execute(Runnable command);
  public <T> Future<T> submit(Callable<T> task);
  public Future<?> submit(Runnable task);
  public <T> Future<T> submit(Runnable task, T result);
  ...
}

Java SE 8 Completable Future

-ทั้ง Futures และ callbacks มีข้อบกพร่องร้ายแรงโดยเฉพาะอย่างยิ่ง Reactive code
-Java SE 8 สนับสนุนการทำงานเพื่ออนาคตที่ดีขึ้นอย่างมีนัยสำคัญ สำหรับ Reactive programming
-Non-blocking, event-driven, composable และ functional (รวมทั้ง lambdas)
-ง่ายต่อการรวมกับ Java EE 7  (Java EE และ Java SE)

The Problem with Futures (and Callbacks)

Person p = ...
Future<Assets> f1 = executor.submit(() -> getAssets(p));
Future<Liabilities> f2 = executor.submit(() -> getLiabilities(p));
Future<Credit> f3 = executor.submit(() -> calculateCreditScore(f1.get(), f2.get()));
// The unrelated calls below are now blocked for no reason.
Future<History> f4 = executor.submit(() -> getHealthHistory(p));
Future<Health> f5 = executor.submit(() -> calculateHeathScore(f4.get()));
// Unrelated paths join below.
Future<Coverage> f6 = executor.submit(() -> underwrite(f3.get(), f5.get()));

ตัวอย่างการใช้งาน : https://github.com/m-reza-rahman/reactive_javaee/blob/master/CallbackHell.java

CompletableFuture Basics

@Resource ManagedExecutorService executor;
...
public CompletableFuture<Confirmation> processPayment(Order order) {
  CompletableFuture<Confirmation> future = new CompletableFuture<>();
  executor.execute(() -> {
    Confirmation status = ...
    future.complete(status);
  });
  return future;
}
paymentService.processPayment(order).thenAccept(confirmation -> System.out.println(confirmation));


Functional Reactive to the Rescue?

CompletableFuture<Assets> getAssets = CompletableFuture.supplyAsync(() -> getAssets(person), executor);


CompletableFuture<Liabilities> getLiabilities = CompletableFuture.supplyAsync(() -> getLiabilities(person), executor);


CompletableFuture<Credit> calculateCreditScore = getAssets.thenCombineAsync(getLiabilities,(assets, liabilities) -> 


calculateCreditScore(assets, liabilities), executor);


CompletableFuture<Health> calculateHeathScore = CompletableFuture.supplyAsync(() -> getHealthHistory(person), executor).thenApplyAsync(history -> calculateHeathScore(history), executor);


Coverage coverage = calculateCreditScore.thenCombineAsync(calculateHeathScore,(credit, health) -> underwrite(credit, health), executor).join();

More Possibilities…

Reactive JPA?
-Async/NIO สนับสนุนในการอ้างอิง  database driver/JDBC
-specialized thread pool

CompletableFuture<List<Country>> countries = em.createQuery("SELECT c FROM Country c", Country.class).async().getResultList();


Reactive MVC?
-คล้ายกับรูปแบบพื้นฐานใน JAX-RS/Servlet
-EJB async เป็นอีกรูปแบบที่เป็นไปได้
-แนวคิด Reactive JSF ทำได้ยากลำบาก

Summary
-Reactive programming มีเทคนิคดีขึ้น และอาจมีความสำคัญมากขึ้นในอนาคตอันใกล้
-Java EE มีการสนับสนุนสำหรับ Reactive model
-Reactive จะดีมากยิ่งขึ้นด้วย Java EE 8
-Java SE 8 ช่วยให้การเขียนโปรแกรมง่ายขึ้น
-ต้องระวังไว้ว่า การเขียนโปรแกรมแบบ Reactive เป็นวิธีที่ง่ายต่อการใช้งาน

Resources

-Java EE Tutorials
http://docs.oracle.com/javaee/7/tutorial/doc/home.htm
-Java SE Tutorials
http://docs.oracle.com/javase/tutorial/
-Digging Deeper
http://docs.oracle.com/javaee/7/firstcup/doc/home.htm
https://glassfish.java.net/hol/
http://cargotracker.java.net
-Java EE Transparent Expert Groups
http://javaee-spec.java.net
-Java EE Reference Implementation
http://glassfish.org
-The Aquarium
http://blogs.oracle.com/theaquarium

Reference : เอกสาร Reactive Java EE - Let Me Count the Ways! โดย Reza Rahman (Java EE/GlassFish Evangelist) จาก Oracle ในงาน JavaOne
top.sk Web Developer

I can design web applications by using Java Server Faces (JSF), Primefaces, EJB3, SQL, DB2 (IBM) and designing report (Word, Excel and PDF) by using XML Script and Crystal Clear Report for the organization that can be easily and no problem for used and they can use the Web to manage the customer's organization effectively. I want to learn a new culture, technology and colleagues involved in the IT profession.

ไม่มีความคิดเห็น :

แสดงความคิดเห็น