Documentation
here
more info can be foundWhat's New
version 0.3.6
Add subscribe to routepoint, to get reply from route you can do: reply = await routepoint.tasks().next(new TxRouteServiceTask<any>({..}, {..}));
Now you can subscribe on the reply channel. See RoutePoint for more details
version 0.3.4
Change TxMountPoint
such as reply is send back only to the sender. This is similar to TxDoublePoint
but you have stil the option of sending reply back to all subscribers on the reply channel
version 0.3.0
Refactory of the routepoint code. New API is present for using C2C (Component-2-Component) communcation.
Add support for routepoint publication and discovery services.
You can use typescript-publisher as publication and discovery sevice. See below for more details.
version 0.2.21
- Rx-TXJS is now supporting DAG Job, see tests/job/tx-job-execute-goto-simple.spec.ts for example.
- Adding short version of mountpoint subscription, see tests/mountpoint/tx-mountpoint-method.spec.ts for example.
Introduction
Rx-txjs implement an execution model based on decoupling objects (a Components) under Nodejs environment.
You implement your business logic using a components. A components are regular classes register themself to rx-txjs using mountpoint. A mountpoint is rx-txjs classes able to interact with your class by a publish / subscribe model.
Note: activating a component is done by publish a "message". Rx-txjs use mountpoint/s to publish a message to a component.
A job is use to run a sequence of components to acomplish a complex business log.
Rx-TXJS Hello World
define a component return 'Hello'
; // define Hello component
Create an instance of World so it able to register it under TxMountPointRxJSRegistry. Then send a message to it.
new World; // get a mountpoint of Hello component so you can interact with it; // subscribe to messages coming from Hello componentmountpoint.reply.subscribe; // send a message to Word componentmountpoint.tasks.nextnew TxTask, 'Hello';
A Short Version of World Component
; // define World component // call it where {method: 'run'} is in the heas of TxTaskmountpoint.tasks.nextnew TxTask, 'Hello';
To interact with a Component you are using several version a * MountPoints * objects.
MountPoints
-
- TxMountPoint * - implement two ways 1:N traditional public / subscribe model. With TxMountPoint you can send a message to a Component and having multiple subscribe listen to it's reply (see the like above for full documentation).
-
- TxSinglePoint * - implement 1:1 one way communication channel with the Component. Use this MountPoint to send a data but in order to get it's reply you have to provide your own (other) TxSinglePoint object.
-
- TxDoublePoint * a kind of wrapper that include two different SinglePoint to implement bi-directional communication channels with a Component.
-
- TxRoutePoint * a kind of MountPoint that implement Class-2-Class direct communication over express (HTTP).
-
- TxQueuePoint * a kind of MountPoint that implement Class-2-Class direct communication over queue system.
Featute List
-
Job Execution
- Able to run an series of Components implement one more complex job. -
Job Continue / Step / Undo
- features of job executions (see link above for full documentation) -
Persistence
- able to save each step of execution. This feature is well fit with 'Job Continue' so you can run a job until certain point, stop it and resume it later on. -
Error Handling
- in case of error, have the chance for each object to clean up. -
Job Events
- job has many events you ca n listen to during it's execution. -
Recording
- record the data passing between objects during the execution of a Job. -
C2C
- Class-2-Class communication enable to communication directory between to Components over some communication channel. this could be over HTTP (node express) or some kind of queue system (like Kafka, RabbitMQ). -
S2S
- Cross Service Job, this enable to run a job spreading on several services. -
Monitor
- a full monitor solution. -
Distribution
- run a job's components in different service / container instance.
Quick Start
Using a Component
Add the following code line to your class
mountpoint = TxMountPointRegistry.instance.create'MOUNTPOINT::NAME';
Note: you can any of kind of mountpoint version depend on your specfic case.
For example your component may looks like that:
To send a message to your component use the code line:
; mountpoint.task.nextnew TxTask, ;
Using a Job
Create the components you want ( say C2Component
and C3Component
) them to part of a Job same as C1Component
, .
Note: When using a component as part of the a job you must use TxSinglePoint
. So in you Component define:
;
Define a Job as follow:
; job.addTxSinglePointRegistry.instance.get'GITHUB::GIST::C1';job.addTxSinglePointRegistry.instance.get'GITHUB::GIST::C2';job.addTxSinglePointRegistry.instance.get'GITHUB::GIST::C3'; job.executenew TxTask 'create', 'begin', ;
This will run the components in the order they defined. This is a very simple verision of the exection options.
job.execute
include many more options like persist, recording, run-until and so on.
Understand MountPoints
A 'mountpoint' is a borker object, a meeting point, able to communicate by between parts (usually classes) using different method of public / subscribe.
MountPoint ofer two way communications one is 'tasks' is send data from the caller and other for 'reply' where the receiver able to ger reply back. Some mountpoint may diffeently like TxSinglePoint
is a type of mountpoint which has no reply channel, you can just send the data.
Other example is TxRoutePoint
where the reply is receiving as the return value of next methof.
MountPoint is identify by its unique name (a string or a Symbol). It must be unique in a process and if you use publication then is must be unique among all services / processes
A Component which create a mountpoint use the follow:
mountpoint = TxMountPointRegistry.instance.createcomponent-name;
To subscribe on this mountpoint use:
mountpoint.tasks.subscribe,;
On any other place in the code you can send a data with this mountpoint using the next
method and TxTask<T>
object:
mountpoint = TxMountPointRegistry.instance.getcomponent-name; mountpoint.tasks.nextTxTask...;
Reply back to all reply's subscribers
Usually you want to reply back to the sender however there are some cases where you want to reply to all subscribers even those who don't sent any data. This can be done by:
mountpoint = TxMountPointRegistry.instance.usecomponent-name; mountpoint.tasks.nextTxTask...;
Then reply like:
mountpoint.tasks.subscribe};
Let see some more mountpoints
-
TxSinglePoint - implement 1:1 one way communication channel with the Component. Use this MountPoint to send a data but in order to get it's reply you have to provide your own (other) TxSinglePoint object.
-
TxDoublePoint a kind of wrapper that include two different SinglePoint to implement bi-directional communication channels with a Component.
-
TxRoutePoint a kind of MountPoint that implement Class-2-Class direct communication over express (HTTP).
-
TxQueuePoint a kind of MountPoint that implement Class-2-Class direct communication over queue system.
RoutePoint
RoutePoint able to connect your two components on two different services by using Express. You have to define two routepoint one on the server and other on the client.
RotuePoint is wrapping the express request, response but you have full access / control on those two.
Server Side
Component S1Component
define server side as follow:
// server side routepoint configuration paramerts;
// create a routepoint on the server sideroutepoint = TxRoutePointRegistry.instance.route'GITHUB::S2', config;
Then define the subscription, the data coming from the client side:
routepoint.tasks.subscribe; }
The whole code is looks like that:
;;; new S1Component;
Client side
The client side able to communcation with the server.
Define configuration object as folllow
// server side routepoint configuration paramerts;
Create the rotuepoint on the registry as follow
routepoint = TxRoutePointRegistry.instance.create'GITHUB::C1', config;
NOTE: on the client side you use the
create
not theroute
as in the server.
To use this client routepoint any where on the code:
;;
NOTE: The object {source: 'service-a', token: '1234'} is goes to the http headers of the request.
The whole code may looks like that:
;;
;;
Or you can subscribe to the reply as follow:
routepoint.reply.subscribe;
Task
A task is a generic object of head and data, something like that:
You can use it like the following:
; ; // or let h: head = t.getHead(); // or let d: Data = t.getData()
Using a Job
A Job runs a set of components one by after the other in the same order as they
- First create:
let job = new Job('job-1')
- Then add some component to it by using add method:
job.add(TxMountPointRegistry.instnace.get(<component-name>)
- Finally you can run by execute | step | continue methods to launch the execution.
Execute Jobs methods
- execute - run the all component one after the other.
- continue - continue running the jobs from where it stoped, usually this is relevant when job rebuild after serialization.
TxJob Functionality
-
execute - run the components one after the other
- option: persistence - (toJSON / upJSON) before every component activation serialize the job into a persistence so it can recover and continue from the exact same place.
- option:until - run until certain component and stop. the execution can resume with execute / continue and step methods
- option:record - save the data pass between components so it can use for debugging and regression tests.
-
continue - run a job right from the same place where is stop.
-
step - run the job in single step. when each step is completed an even will rise indicate to complete.
-
toJSON / upJSON - and serialize / desalinize methods.
-
Error handling - if some component has an error it reply through the tasks mountpoint error channel. then the job hold it's execution and start running error sequence on all the called components.
TxJob events
TxJob send two events isCompleted and isStopped.
isCompleted
- is send when all components where executed. use job.getIsCompleted().subscribe(..);
;job.add... ;
-
isStopped
- on single step, after each step is completed. use job.getIsStopped().subscribe(..); -
onComponent
- notify on every reply from a component. use job.getOnComponent().subscribe(..); -
Using once:
TxJobRegistry.instance.once'job: ' + job.getUuid,;
Persistence
The persistence enable you to serialize certain job, store it in some external storage then reconstruct later one and continue the execution (by continue method) exactly from the same place. To use the persistence you have two options one using the TxJobPersistAdapter or using the low level job.toJSON and job.upJSON methods.
Using TxPersistenceAdapter
- create a class which implements TxJobPersistAdapter. Implement save and read method.
TxJobPersistAdapter.save(uuid: string, json: TxJobJSON, name?: string)
: this will called by the framework when a job needs to persist.TxJobPersistAdapter.read(uuid: string): TxJobJSON
: this will call by framework when it need to reconstruct a Job.- register you class on the TxJobRegistry as follow:
1 ; // class that implements TxJobPersistAdapter2 TxJobRegistry.instance.driver = persist;
Note: the TxJobJSON is the job serialize type.
This will store the job state before execute each component. The method save is up to you your storage implementation. Usually persistence goes with with run-until execution option. Using execute with run-until:
// execute the job until it reach component <component-name | component Symbol>job.executenew TxTask, , as TxJobExecutionOptions;
Once a job is persist it's state saved on the storage and removed from registry (if persist.destroyis true) So to continue the exection you need to do:
// on any part in the code;job.continue..
Job.continue(new TxTask(..));
Recording
With recording feature you can record all the data pass between components during execution.
Record Persist Driver
This driver is use by the job to save components data. It a class implement the interface:
So to use recording you need:
- Call to
TxJobRegistry.instance.setRecorderDriver(recorder);
where 'recorder' is a class implement TxRecordPersistAdapter.
- Turn on record: true on TxJobExecutionOptions, for example:
job.executenew TxTask, , as TxJobExecutionOptions;
Error Handling
If a component decide it has an error it reply back to world on:
this.mountpoint.tasks().error(new TxTasks<..>(.. I have an error ..)
Then the job stop it current execution and start error handling sequence. It send error message to every already called components in a backward order.
for (every components already called n backward order) do:
mountpoint.tasks().error(..)
done
This will called the error callback on every components which need to clean up its stuff.
Backward / Forward
When error occur you can force a job to run forward or backward on the component list from the point where it happend. Use the execute / continue TxJobExecutionOptions.error.direction
propery to set it.
Distribure Job
First define your components, for example S1Component, S2Component and S3Component like this
Now you need to preset to rx-txjs an object which implement TxDistribute
interface.
This object has to do two things:
- implement send method of
TxDistribute
which send data to queue, express or other method of distribution. - On receiving push the data to TxDistributeComponent.
You can use the builtin TxDistributeBull
from rx-txjs-adapters package which will do all the work using bull library.
// import TxDistributeBull from rx-txjs-adapters package; // set the distributer so the job will know to where the send the data TxJobRegistry.instance.setDistributenew TxDistributeBull'redis://localhost:6379';
Now you can define the job and run it in distributed manner.
// create the job and add it's components; job.addTxSinglePointRegistry.instance.get'GITHUB::S1';job.addTxSinglePointRegistry.instance.get'GITHUB::S2';job.addTxSinglePointRegistry.instance.get'GITHUB::S3';
// define and callback when job is completedTxJobRegistry.instance.once'job: ' + job.getUuid,; // now execute the job with *publish* flag turn on. job.executenew TxTask, , as TxJobExecutionOptions;
- Here the complete example
;; ; ;;;; new S1Component;new S2Component;new S3Component; TxJobRegistry.instance.setDistributenew TxDistributeBull'redis://localhost:6379'; logger.info'tx-job-distribute.spec: check running S1-S2-S3 through distribute'; ; job.addTxSinglePointRegistry.instance.get'GITHUB::S1';job.addTxSinglePointRegistry.instance.get'GITHUB::S2';job.addTxSinglePointRegistry.instance.get'GITHUB::S3'; TxJobRegistry.instance.once'job: ' + job.getUuid,; job.executenew TxTask, , as TxJobExecutionOptions;
Routepoint, Publication and Discovery
Rx-TXJS support a typeof mountpoint which is routepoint
. This routepoint enable to communction between two Components on two different services with the same next/subscribe API as use locally in a service.
Defining routepoint involving defining two components one for the server and one for the client.
server-side
- component may looks like that:
;
GITHUB::R1
: is the name of the component must be unique among ALL services.Headers
: this goes to the HTTP header in the reqeust.reponse
: this define how to send the response to the client.source: 'R1Component', status: "ok"
: any data object return back to client.
client-side
- (without publication) may looks like that, this routepoint define internally or on another service use:
// first create the client side routepoint. This is done once on initialization TxRoutePointRegistry.instance.create'GITHUB::R1', config; // then use it in where you want to get an already define routepoint on the client side ; // subscribe to reply from the receiver (the server) routepoint.subscribe ; // make the call, send {source: 'back-client-main'}, {from: 'clientRoutePoint'} to the server ;
if you use publication then all you have to do is:
// then use it in where you want to get an already define routepoint on the client side ; // subscribe to reply from the receiver (the server) routepoint.subscribe ; // make the call, send {source: 'back-client-main'}, {from: 'clientRoutePoint'} to the server ;
const config = TxRouteServiceConfig {...}, TxRoutePointRegistry.instance.create('GITHUB::R1', config);
The publication and discovery enable you not to worry where the routepoint is. It save you from defining So now you can build a complete micro-service architecture with business logic based on route point with great flexibility.
https://github.com/tsemach/typescript-publisher for more details.
SeeAPI
TxJob::execute
Run all component one after the other, TxTask is passing between components, the output of a component is the input of the next component.
arguments
- TxTask: an object including your data
- options: a directive to execute until: {util: 'GITHUB::GIST::C2'} run until component GITHUB::GIST::C2 then stop. use continue to resume the process.
usage
;;; // NOTE: you can use Symbols as well (see TxJob above).; // or create througth the TxJobRegistry job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C1';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C2';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C3'; job.executenew TxTask 'create', '', ;
TxJob::continue
Continue running the job from it's current state, this is useful when rebuild the Job with upJSON
(deserialize)
arguments
- TxTask: an object including your data
usage
let job = new TxJob(); let from = { "name": "GitHub", "block": "GITHUB::GIST::C1,GITHUB::GIST::C2,GITHUB::GIST::C3", "stack": "GITHUB::GIST::C2,GITHUB::GIST::C3", "trace": "GITHUB::GIST::C1", "single": false, "current": "GITHUB::GIST::C2" } let after = job.upJSON(from).toJSON();
job.continue(new TxTask( 'continue', '', {something: 'more data here'}) );
});
TxJob::toJSON
Serialize TxJob to JSON so it can persist and rebuild later on.
; // or create througth the TxJobRegistry job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C1';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C2';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C3'; ;
TxJob::upJSON
Deserialize TxJob from JSON, togther with continue you can stoe the JSON in the database (or some other persistency) then rebuild it.
usage
; // or create througth the TxJobRegistry job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C1';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C2';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C3'; // run one step, C1Component is runningjob.stepnew TxTask 'step-1', '', ; ; // running the remaining C2Component and C3Componentother.continuenew TxTask 'continue', '', ;
TxJob::reset
Return the job to it's initial state so it can run again.
usage
; // or create througth the TxJobRegistry job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C1';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C2';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C3'; job.executenew TxTask 'create', '', ; job.reset; job.executenew TxTask 'create', '', ;
TxJob::undo
Run undo sequence on previous execute / continue. The undo send undo message to each already executed component in forward or backward order.
The component register on the undo message as:
this.mountpoint.undo.subscribe
For example if the chain including C1, C2, C3. After the execution, calling to undo with backward order will initiate a sequence of undo call to each component in reverse order, C3, C2, C1.
usage
; // or create througth the TxJobRegistry job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C1';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C2';job.addTxMountPointRegistry.instance.get'GITHUB::GIST::C3'; job.executenew TxTask 'create', '', ; job.undobackword;