Introduction
In the loading of a Data Warehouse is important to have full control of the processing units that compose it. Each processing unit must be carefully monitored both in the detection of errors that may occur, both in the analysis of the execution times.
In this article, which uses the messaging techniques described in the slideshare [http://www.slideshare.net/jackbim/recipe-7-of-data-warehouse-a-messaging-system-for-oracle-dwh-1], I enrich the Micro ETL Foundation, by building a control system of the PL/SQL elaboration units.
By using the MEF messaging system, we have seen how to activate messages that are then stored in a log table. In the demonstration tests, it is seen as a suitable sequence of messages and a correct settings of the package variables , has provided an idea of the processing flow and the delay between a message and the other.
It could call The setting of those tests as an "excess of zeal". In fact the purpose of the messaging system was much less ambitious. It wanted provide only generic information at any moment of the ETL process, simply calling a procedure.
It is now time to take the next step, to implement an agile control system for the processing units. The goal is always the same: it must be simple and non-invasive. This means that it can plug into existing DWH systems (and not only) gradually, without changing the elaborative flow. There is no doubt, however, that if you have already set up your ETL process in a modular way, the application of the techniques that will be described, it will be simpler and more natural.
I will use concepts and definitions already given in the messaging system. Now let us concentrate on the concept of modularity, which is fundamental to the control system.
Modularity and sequencing
The concept of modularity is the basis of the techniques that will be exhibited.As we know, a complex system, and the ETL process of a Data Warehouse can be defined without doubt very complex, it can be managed and understood only if we can break its overall complexity, in less complex components. In other words: you can not have one big main program that contains thousands code lines. This is the first point.
The second point is the sequentiality. We must try to think, and you can almost always do, that each component of the process is connected to the next, and that their sequential execution leads to the final loading of the Data Warehouse. Please note, I am not saying it is not possible the parallelism, but to identify which components are completely independent of each other (so they can run in parallel), it is not an easy task; without forgetting all the problems of their synchronization.
Moreover, the parallelism also requires a specific hardware structure and specific Oracle settings. And the performance improvement,I speak from my experience, it is not so sure. I suggest, therefore, to try to apply parallelism on the objects rather than on the processes. Usually, the dimension tables may be loaded in parallel (if there are not logical connections between them), but why complicate our lives if we can reason in a simple sequential way ?
Recall that simplicity is a pillar of the Micro ETL Foundation. So the advice is: modularity and sequencing. Do not forget that the ETL process, is physiologically sequential in its basic pillars. You can not load a Data Mart of level 2 before loading the Data Mart of level 1. And the Data Mart of level 1 cannot be loaded until you first load the dimensions, which in turn, cannot be loaded if you have not first loaded the Staging Area tables, and so on.
The figure below shows the concepts of modularity and sequencing applied to a hypothetical schedule S1. On the left we have the "logical" components of the ETL process, ie not code, but names configured in a table.On the right we have the "physical" components, ie the real programming code.
Requirements
The main requirement, as already mentioned, is to have the control of each processing unit (unit) which
constitutes the ETL process . Having control means that, for each unit, at any time, I need to know:
• when it started and when it ended.
• how long was his execution.
• if it is successful or has had some problems (exception).
• if you received an exception, what is the error that occurred
• What are the consequences in case of error.
In order to meet those requirements, it is not necessary to use very complex structures. In accordance with the MEF philosophy, I will use just a configuration table (MEF_UNIT_CFT), that allows me to enter the main characteristics of the units that make up the loading job, and a logging table (MEF_UNIT_LOT) that allows me to see the log of the executions.
The most important information present in the configuration table, along with some context information, is the continuity flag, which allows me to decide whether, in the event of an error, the error is critical (ie it must abort the job that has called the unit) or not critical (ie it must allows the running of the next unit ).
To be able logging the execution, you will have to "surround" the unit call by a procedure call that logs the beginning, and from a procedure call that logs the end.
In addition, the error situations should be treated in a uniform manner by a unique procedure, which logs the error and implements the consequences using the continuity flag.
To control the behavior of a unit means, first of all, to understand its life cycle in the global context of the job to which it belongs. To that end, we will use the the theory of the finite-state machine, or, if we want to be a bit more modern, we will use a simplified version of the state-diagrams of the Unified Modelling Language.
The state diagram
As stated previously, the MEF control system, in order to perform its function, it must surround the execution of every elaborative unit, with a procedure call (p_start) that registers its beginning, and with a procedure call (p_end) that registers the end. In practice, the program code (we will see him clearly in the test case) must have call like:
p_start
<unit call x>
p_end
p_start
<unit call y>
p_end
...
This seems so simple, but to complicate the situation, there may be exceptions in the execution. These exceptions should be handled by a procedure call (p_exc) always present inside the unit. A state diagram is the most useful tool to understand the unit life cycle.
The Figure shows the various states and the possible state changes of the units inside a block of executions. Everything shown graphically it was translated into PL/SQL language.
Because each unit must be preceded by a procedure call that logs the beginning of the execution, the procedure p_start places the unit in the "running" state , and set the return code to a obviously unknown ("?") value. It is important to note that after the p_start must be present only the call of the unit and no other procedure.
The unit can conclude its run in two different ways. It finishes without any problems, ie no Oracle error,or it fails. In the first case the unit will call the p_end procedure, which leads us into a "Done" terminal state with return code = "OK". In the second case, depending on the setting of the continuity flag in the configuration table, it can behave in two different ways.
The unit ends in a definitive way and thus prevents the execution of any other unit, it switches in the "Aborted" state and sets the return code = "NOT OK".
The unit ends with a warning, switches in the "Done" state but with "OK (Warning)" as return code.This doesn't prevent the execution of the next unit; In fact, in this state is again possiblecall a p_start procedure that will switch the next unit in the "Running" state.
The p_end procedure however, will do nothing, leaving the unit in the final state.Each state change that is not present in the diagram, will always give an error message.This ensures that, for distraction, you have not followed the correct sequence of calls(eg. you have forgotten the p_end procedure or it is called more than one time, or other).
The exception management
Since the more complex change of state is related to the error situations, we explore briefly the Exception management, (of which, however, we have already seen some examples inside the messaging techniques).
In the PL/SQL language, an error situation that occurs while the program is running, it is called "exception". The error condition can be generated by the program itself (eg. Division by zero), or forced by the logic of the program (eg,. An amount that does not exceed a certain threshold).
In the latter case, the exception is explicitly reached using the RAISE_APPLICATION_ERROR statement. Regardless of the cause of the error, officially identified as internal exception or user-defined exception, the Oracle PL/SQL engine transfers the control of the program toward the exception handler of the running module (or PL/SQL block) In practice the code after the EXCEPTION keyword .
Obviously, if the EXCEPTION keyword is not present, the program will endimmediately because did not find any indication for error handling.
Let us now analyze the error propagation. If there are several nested procedures between them, an unhandled error by the most internal procedure , propagates into the caller procedure, and so forth up to the main program. If the error is handled, the procedure will continue its regular work. (unless inside the exception handler there is the RAISE keyword or there is a software error into the exception handler).
To clarify the positioning of the exception call, we try to anticipate what will be the structure of the program code, using the next figure.
Design
The design of the unit control system, is composed of two tables, and a sequence.The MEF_CFT table is the configuration table of the processing units.The MEF_UNIT_LOT table is the one that keeps the log of the executions of the units. The MEF_UNIT_LOT_SEQ sequence serves to give a sequence number to each log line.
drop table mef_unit_cft cascade constraints purge;
create table mef_unit_cft
(
sched_cod varchar2(30) not null,
job_cod varchar2(60) not null,
unit_cod varchar2(61) not null,
sort_cnt number ,
continue_flg number ,
unit_active_flg number ,
job_active_flg number
)
;
drop table mef_unit_lot cascade constraints purge;
create table mef_unit_lot
(
seq_num number not null,
day_cod number not null,
sched_cod varchar2(30) not null,
job_cod varchar2(60) not null,
unit_cod varchar2(61) not null,
exec_cnt number not null,
status_cod varchar2(60) not null,
return_cod varchar2(30) not null,
ss_num number not null,
mi_num number not null,
hh_num number not null,
elapsed_txt varchar2(4000) not null,
errmsg_txt varchar2(4000),
stamp_dts date not null
)
;
alter table mef_unit_cft add (
constraint mef_unit_cft_pk1
primary key
(job_cod, unit_cod));
drop sequence mef_unit_lot_seq;
create sequence mef_unit_lot_seq nocache;
create table mef_unit_cft
(
sched_cod varchar2(30) not null,
job_cod varchar2(60) not null,
unit_cod varchar2(61) not null,
sort_cnt number ,
continue_flg number ,
unit_active_flg number ,
job_active_flg number
)
;
drop table mef_unit_lot cascade constraints purge;
create table mef_unit_lot
(
seq_num number not null,
day_cod number not null,
sched_cod varchar2(30) not null,
job_cod varchar2(60) not null,
unit_cod varchar2(61) not null,
exec_cnt number not null,
status_cod varchar2(60) not null,
return_cod varchar2(30) not null,
ss_num number not null,
mi_num number not null,
hh_num number not null,
elapsed_txt varchar2(4000) not null,
errmsg_txt varchar2(4000),
stamp_dts date not null
)
;
alter table mef_unit_cft add (
constraint mef_unit_cft_pk1
primary key
(job_cod, unit_cod));
drop sequence mef_unit_lot_seq;
create sequence mef_unit_lot_seq nocache;
Let us now see in detail these objects.
The DDW_COM_ETL_UNIT_LOT_SEQ sequence
It is the most functional of the time stamp to sort the table. Because sometimes the units begin and end in fractions of a second of each other,the time stamp might not be sufficiently discriminating.
The DDW_COM_ETL_UNIT_CFT table
This table contains the configuration information of the processing units In it you configure schedules, jobs and units.For the purpose of the control system of the elaboration units, jobs and schedules will be used in a static way, as parameters to the procedure calls.
Their management will be explained, in the future, in the control system of the jobs. As well the SORT_CNT, UNIT_ACTIVE_FLG and JOB_ACTIVE_FLG fields not havean immediate use, and we not have to set them.
- SCHED_COD: Identifies the schedule to which the job belongs. It is alogical entity, in the sense that we have to think of it as the identifier of a job list.
- JOB_COD: identifier of the job. It is a logical entity, in the sense that we have to think of it as the identifier of a list of elaboration units.
- UNIT_COD: Identifier of the processing unit within the job. We can think of it as a Oracle packaged procedure.
- SORT_CNT: Counter of the unit inside the job.
- CONTINUE_FLG: Continuity flag. If set = 1 means that in the event of an error, the next unit can continue, if set = 0 means that the job should have an abort because the error is blocker.
- UNIT_ACTIVE_FLG: A flag that indicates whether the unit is active.
- JOB_ACTIVE_FLG: Flag indicating whether the job is active
The DDW_COM_ETL_UNIT_LOT table
This table stores all information relating to the executions of all processing units. Its structure is very similar to that of messaging system as regards the timing information, but in addition also retains the unit status and the final outcome of its execution.
- SEQ_NUM: Sequential number of the line obtained from the Oracle sequence .
- DAY_COD: day of the execution in the YYYYMMDD (year, month, day) format
- SCHED_COD: identifier of the schedule to which the job belongs.
- JOB_COD: identifier of the job.
- UNIT_COD: Identifier of the processing unit within the job.
- EXEC_CNT: Identifier of the job execution. Every job execution should be tagged by a number, in turn extracted from a Oracle sequence.
- STATUS_COD: State of the unit.
- RETURN_COD: return code of the execution of the unit.
- SS_NUM: Number of seconds consumed by the processing unit.This information, together with the two following, is a summable statistical number.
- MI_NUM: Number of minutes consumed by the processing unit
- HH_NUM: Number of hours consumed by the processing unit
- ELAPSED_TXT: execution time in the HH24MISS format
- ERRMSG_TXT: Error Message.
- STAMP_DTS: Time stamp.
The MEF_UNIT package
This package is the core of the control system.
/*******************************************************************************
* Package Specification
*******************************************************************************/
CREATE OR REPLACE package mef_unit is
type pt_work_rec is record (
status_num number,
lot_row mef_unit_lot%rowtype,
cft_row mef_unit_cft%rowtype,
fail_list_txt varchar2(4000),
fail_unit_cnt number,
errmsg_txt varchar2(4000),
continue_flg number
);
pv_work_rec pt_work_rec;
function f_concat(p_curr varchar2,p_add varchar2, p_size number,p_sep varchar2)
return varchar2;
procedure p_start(
p_sched_cod varchar2
,p_job_cod varchar2
,p_unit_cod varchar2);
procedure p_end;
procedure p_exc(p_unit_cod varchar2,p_errmsg_txt varchar2);
procedure p_ins_unit_lot(p_row in out mef_unit_lot%rowtype);
procedure p_upd_unit_lot(p_row in out mef_unit_lot%rowtype);
procedure p_get_cft(p_sched_cod varchar2,p_job_cod varchar2
,p_unit_cod varchar2,p_cft_row in out mef_unit_cft%rowtype);
end;
/
sho errors
/*******************************************************************************
* Package Body
*******************************************************************************/
CREATE OR REPLACE package body mef_unit as
pv_pkg varchar2(30) := 'mef_unit.';
pv_error EXCEPTION;
pragma exception_init (pv_error, -20058);
function f_concat(p_curr varchar2,p_add varchar2, p_size number,p_sep varchar2)
return varchar2 as
v_module_cod varchar2(61) := pv_pkg||'f_concat';
v_out varchar2(4000);
begin
if (p_curr is null) then
v_out := substr(p_add,1,p_size);
else
if (length(p_curr)+length(p_add)+1 > p_size) then
v_out := substr(p_curr||'...',1,p_size);
else
v_out := p_curr||p_sep||p_add;
end if;
end if;
return v_out;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_ins_unit_lot(p_row in out mef_unit_lot%rowtype) is
pragma autonomous_transaction;
v_module_cod varchar2(61) := pv_pkg||'p_ins_unit_lot';
begin
insert into mef_unit_lot values p_row;
commit;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_upd_unit_lot(p_row in out mef_unit_lot%rowtype) is
pragma autonomous_transaction;
v_module_cod varchar2(61) := pv_pkg||'p_upd_unit_lot';
begin
mef.delta_time(p_row.stamp_dts,sysdate,p_row.ss_num
,p_row.mi_num,p_row.hh_num,p_row.elapsed_txt);
update mef_unit_lot set
exec_cnt = p_row.exec_cnt,
status_cod = p_row.status_cod,
return_cod = p_row.return_cod,
ss_num = p_row.ss_num,
mi_num = p_row.mi_num,
hh_num = p_row.hh_num,
elapsed_txt = p_row.elapsed_txt,
errmsg_txt = p_row.errmsg_txt,
stamp_dts = p_row.stamp_dts
where seq_num = p_row.seq_num;
commit;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_get_cft(p_sched_cod varchar2,p_job_cod varchar2
,p_unit_cod varchar2,p_cft_row in out mef_unit_cft%rowtype) is
v_module_cod varchar2(61) := pv_pkg||'p_get_cft';
begin
select * into p_cft_row
from mef_unit_cft
where job_cod=p_job_cod
and unit_cod=p_unit_cod
--and sched_cod=p_sched_cod
;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod,mef.f_str(
'Sched %1 job %2 unit %3 not found in mef_unit_cft (check case sensitive)',
p_sched_cod,p_job_cod,p_unit_cod));
end;
procedure p_init_unit (
p_sched_cod varchar2
,p_job_cod varchar2
,p_unit_cod varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_init_unit';
begin
mef.pv_job_cod := p_job_cod;
mef.pv_sched_cod := p_sched_cod;
mef.pv_unit_cod := p_unit_cod;
p_get_cft(
p_sched_cod,p_job_cod,p_unit_cod,pv_work_rec.cft_row);
pv_work_rec.lot_row.seq_num := mef.f_get_seq_val(
'mef_unit_lot_seq','nextval');
pv_work_rec.lot_row.job_cod := p_job_cod;
pv_work_rec.lot_row.sched_cod := p_sched_cod;
pv_work_rec.lot_row.unit_cod := p_unit_cod;
pv_work_rec.lot_row.stamp_dts := sysdate;
pv_work_rec.lot_row.day_cod := to_char(sysdate,'yyyymmdd');
pv_work_rec.lot_row.errmsg_txt := null;
pv_work_rec.lot_row.ss_num := 0;
pv_work_rec.lot_row.mi_num := 0;
pv_work_rec.lot_row.hh_num := 0;
pv_work_rec.lot_row.elapsed_txt := '?';
pv_work_rec.lot_row.exec_cnt := nvl(mef.pv_exec_cnt,0);
pv_work_rec.continue_flg := 0;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_exc_continue (p_unit_cod varchar2,p_errmsg_txt varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_exc_continue';
begin
pv_work_rec.lot_row.status_cod := 'Done';
pv_work_rec.lot_row.return_cod := 'OK (Warning)';
pv_work_rec.lot_row.errmsg_txt := p_errmsg_txt;
pv_work_rec.errmsg_txt :=
f_concat(pv_work_rec.errmsg_txt,p_errmsg_txt,4000,mef.cr);
pv_work_rec.status_num := 3;
p_upd_unit_lot(pv_work_rec.lot_row);
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_exc_abort (p_unit_cod varchar2,p_errmsg_txt varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_exc_abort';
begin
pv_work_rec.lot_row.status_cod := 'Aborted';
pv_work_rec.lot_row.return_cod := 'NOT OK';
pv_work_rec.lot_row.errmsg_txt := p_errmsg_txt;
pv_work_rec.errmsg_txt := p_errmsg_txt;
p_upd_unit_lot(pv_work_rec.lot_row);
pv_work_rec.status_num := 4;
raise_application_error (-20058, 'Module '||p_unit_cod||' aborted !');
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
/*******************************************************************************
* Entry points
*******************************************************************************/
procedure p_start (
p_sched_cod varchar2
,p_job_cod varchar2
,p_unit_cod varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_start';
v_str varchar2 (4000);
begin
if (nvl(pv_work_rec.status_num,0) in (0,2,3)) then
p_init_unit(p_sched_cod,p_job_cod,p_unit_cod);
pv_work_rec.lot_row.status_cod := 'Running';
pv_work_rec.lot_row.return_cod := '?';
p_ins_unit_lot(pv_work_rec.lot_row);
pv_work_rec.status_num := 1;
else
raise_application_error (-20058, 'Not allowed');
end if;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_end is
v_module_cod varchar2(61) := pv_pkg||'p_end';
begin
if (pv_work_rec.status_num = 1) then
pv_work_rec.lot_row.status_cod := 'Done';
pv_work_rec.lot_row.return_cod := 'OK';
p_upd_unit_lot(pv_work_rec.lot_row);
pv_work_rec.status_num := 2;
elsif (pv_work_rec.status_num = 3) then
null;
else
raise_application_error (-20058, 'Not allowed');
end if;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_exc(p_unit_cod varchar2,p_errmsg_txt varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_exc';
begin
mef.p_send(p_unit_cod,'%1 (continue = %2)'
,p_errmsg_txt,pv_work_rec.cft_row.continue_flg);
if (nvl(pv_work_rec.status_num,0) <> 1) then
mef.p_send(p_unit_cod,'Not allowed');
raise_application_error(-20058,p_errmsg_txt);
else
pv_work_rec.fail_unit_cnt := nvl(pv_work_rec.fail_unit_cnt,0) + 1;
pv_work_rec.fail_list_txt :=
f_concat(
pv_work_rec.fail_list_txt,p_unit_cod,2000,mef.cr);
if (pv_work_rec.cft_row.continue_flg = 1) then
p_exc_continue(pv_work_rec.lot_row.unit_cod,p_errmsg_txt);
else
p_exc_abort(pv_work_rec.lot_row.unit_cod,p_errmsg_txt);
end if;
end if;
end;
end;
/
sho errors
* Package Specification
*******************************************************************************/
CREATE OR REPLACE package mef_unit is
type pt_work_rec is record (
status_num number,
lot_row mef_unit_lot%rowtype,
cft_row mef_unit_cft%rowtype,
fail_list_txt varchar2(4000),
fail_unit_cnt number,
errmsg_txt varchar2(4000),
continue_flg number
);
pv_work_rec pt_work_rec;
function f_concat(p_curr varchar2,p_add varchar2, p_size number,p_sep varchar2)
return varchar2;
procedure p_start(
p_sched_cod varchar2
,p_job_cod varchar2
,p_unit_cod varchar2);
procedure p_end;
procedure p_exc(p_unit_cod varchar2,p_errmsg_txt varchar2);
procedure p_ins_unit_lot(p_row in out mef_unit_lot%rowtype);
procedure p_upd_unit_lot(p_row in out mef_unit_lot%rowtype);
procedure p_get_cft(p_sched_cod varchar2,p_job_cod varchar2
,p_unit_cod varchar2,p_cft_row in out mef_unit_cft%rowtype);
end;
/
sho errors
/*******************************************************************************
* Package Body
*******************************************************************************/
CREATE OR REPLACE package body mef_unit as
pv_pkg varchar2(30) := 'mef_unit.';
pv_error EXCEPTION;
pragma exception_init (pv_error, -20058);
function f_concat(p_curr varchar2,p_add varchar2, p_size number,p_sep varchar2)
return varchar2 as
v_module_cod varchar2(61) := pv_pkg||'f_concat';
v_out varchar2(4000);
begin
if (p_curr is null) then
v_out := substr(p_add,1,p_size);
else
if (length(p_curr)+length(p_add)+1 > p_size) then
v_out := substr(p_curr||'...',1,p_size);
else
v_out := p_curr||p_sep||p_add;
end if;
end if;
return v_out;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_ins_unit_lot(p_row in out mef_unit_lot%rowtype) is
pragma autonomous_transaction;
v_module_cod varchar2(61) := pv_pkg||'p_ins_unit_lot';
begin
insert into mef_unit_lot values p_row;
commit;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_upd_unit_lot(p_row in out mef_unit_lot%rowtype) is
pragma autonomous_transaction;
v_module_cod varchar2(61) := pv_pkg||'p_upd_unit_lot';
begin
mef.delta_time(p_row.stamp_dts,sysdate,p_row.ss_num
,p_row.mi_num,p_row.hh_num,p_row.elapsed_txt);
update mef_unit_lot set
exec_cnt = p_row.exec_cnt,
status_cod = p_row.status_cod,
return_cod = p_row.return_cod,
ss_num = p_row.ss_num,
mi_num = p_row.mi_num,
hh_num = p_row.hh_num,
elapsed_txt = p_row.elapsed_txt,
errmsg_txt = p_row.errmsg_txt,
stamp_dts = p_row.stamp_dts
where seq_num = p_row.seq_num;
commit;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_get_cft(p_sched_cod varchar2,p_job_cod varchar2
,p_unit_cod varchar2,p_cft_row in out mef_unit_cft%rowtype) is
v_module_cod varchar2(61) := pv_pkg||'p_get_cft';
begin
select * into p_cft_row
from mef_unit_cft
where job_cod=p_job_cod
and unit_cod=p_unit_cod
--and sched_cod=p_sched_cod
;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod,mef.f_str(
'Sched %1 job %2 unit %3 not found in mef_unit_cft (check case sensitive)',
p_sched_cod,p_job_cod,p_unit_cod));
end;
procedure p_init_unit (
p_sched_cod varchar2
,p_job_cod varchar2
,p_unit_cod varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_init_unit';
begin
mef.pv_job_cod := p_job_cod;
mef.pv_sched_cod := p_sched_cod;
mef.pv_unit_cod := p_unit_cod;
p_get_cft(
p_sched_cod,p_job_cod,p_unit_cod,pv_work_rec.cft_row);
pv_work_rec.lot_row.seq_num := mef.f_get_seq_val(
'mef_unit_lot_seq','nextval');
pv_work_rec.lot_row.job_cod := p_job_cod;
pv_work_rec.lot_row.sched_cod := p_sched_cod;
pv_work_rec.lot_row.unit_cod := p_unit_cod;
pv_work_rec.lot_row.stamp_dts := sysdate;
pv_work_rec.lot_row.day_cod := to_char(sysdate,'yyyymmdd');
pv_work_rec.lot_row.errmsg_txt := null;
pv_work_rec.lot_row.ss_num := 0;
pv_work_rec.lot_row.mi_num := 0;
pv_work_rec.lot_row.hh_num := 0;
pv_work_rec.lot_row.elapsed_txt := '?';
pv_work_rec.lot_row.exec_cnt := nvl(mef.pv_exec_cnt,0);
pv_work_rec.continue_flg := 0;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_exc_continue (p_unit_cod varchar2,p_errmsg_txt varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_exc_continue';
begin
pv_work_rec.lot_row.status_cod := 'Done';
pv_work_rec.lot_row.return_cod := 'OK (Warning)';
pv_work_rec.lot_row.errmsg_txt := p_errmsg_txt;
pv_work_rec.errmsg_txt :=
f_concat(pv_work_rec.errmsg_txt,p_errmsg_txt,4000,mef.cr);
pv_work_rec.status_num := 3;
p_upd_unit_lot(pv_work_rec.lot_row);
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_exc_abort (p_unit_cod varchar2,p_errmsg_txt varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_exc_abort';
begin
pv_work_rec.lot_row.status_cod := 'Aborted';
pv_work_rec.lot_row.return_cod := 'NOT OK';
pv_work_rec.lot_row.errmsg_txt := p_errmsg_txt;
pv_work_rec.errmsg_txt := p_errmsg_txt;
p_upd_unit_lot(pv_work_rec.lot_row);
pv_work_rec.status_num := 4;
raise_application_error (-20058, 'Module '||p_unit_cod||' aborted !');
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
/*******************************************************************************
* Entry points
*******************************************************************************/
procedure p_start (
p_sched_cod varchar2
,p_job_cod varchar2
,p_unit_cod varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_start';
v_str varchar2 (4000);
begin
if (nvl(pv_work_rec.status_num,0) in (0,2,3)) then
p_init_unit(p_sched_cod,p_job_cod,p_unit_cod);
pv_work_rec.lot_row.status_cod := 'Running';
pv_work_rec.lot_row.return_cod := '?';
p_ins_unit_lot(pv_work_rec.lot_row);
pv_work_rec.status_num := 1;
else
raise_application_error (-20058, 'Not allowed');
end if;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_end is
v_module_cod varchar2(61) := pv_pkg||'p_end';
begin
if (pv_work_rec.status_num = 1) then
pv_work_rec.lot_row.status_cod := 'Done';
pv_work_rec.lot_row.return_cod := 'OK';
p_upd_unit_lot(pv_work_rec.lot_row);
pv_work_rec.status_num := 2;
elsif (pv_work_rec.status_num = 3) then
null;
else
raise_application_error (-20058, 'Not allowed');
end if;
exception
when pv_error then raise;
when others then mef.p_rae(sqlerrm,v_module_cod);
end;
procedure p_exc(p_unit_cod varchar2,p_errmsg_txt varchar2) is
v_module_cod varchar2(61) := pv_pkg||'p_exc';
begin
mef.p_send(p_unit_cod,'%1 (continue = %2)'
,p_errmsg_txt,pv_work_rec.cft_row.continue_flg);
if (nvl(pv_work_rec.status_num,0) <> 1) then
mef.p_send(p_unit_cod,'Not allowed');
raise_application_error(-20058,p_errmsg_txt);
else
pv_work_rec.fail_unit_cnt := nvl(pv_work_rec.fail_unit_cnt,0) + 1;
pv_work_rec.fail_list_txt :=
f_concat(
pv_work_rec.fail_list_txt,p_unit_cod,2000,mef.cr);
if (pv_work_rec.cft_row.continue_flg = 1) then
p_exc_continue(pv_work_rec.lot_row.unit_cod,p_errmsg_txt);
else
p_exc_abort(pv_work_rec.lot_row.unit_cod,p_errmsg_txt);
end if;
end if;
end;
end;
/
sho errors
I will give a brief description of the main procedures.
p_start
The task of the p_start procedure, is to record the start of the processing unit. In practice, the code implements the logic present in the diagram of the status changes.The initial test is related to the recognition of the current state to see if the p_start procedure is permitted at this time. If we are not in a state 0, ie, the first elaboration unit of the job, in a 2/3 state, (that is, after the correct end or the end with warning of the previous unit), it generates an abort that prevents to the process to continue.
If we are in the correct state, is called the initialization procedure of the unit and there is the setting of the status variable and of the return code variable. So all this informations are stored in the DDW_COM_ETL_UNIT_LOT table. Finally, there is the change of the unit current status.
p_exc
The procedure of the exceptions management begins immediately with the recording of this anomalous situation in the MEF_MSG_LOT table. At this point there is the test on the state, which, obviously, can only be that of unit in running (ie the state 1).
Are then updated two variables, the use of which we will see in the future, which preserve the history of the units in error.
The fail_unit_cnt is simply a counter of the units that have had problems,
The fail_list_txt variable, links using a carriage return, the name of these units.
The next test, based on the continuity flag, invokes the corresponding management procedures. Let's see.
p_exc_continue
This procedure has the task of recording the error situation, but should not block the process of working through. It then sets the state and the return code of the unit as specified by the state diagram.It will pass in the final state 3, ie ends with warning, and updates the MEF_UNIT_LOT table.
p_exc_abort
This procedure has the task to terminate the processing flow. Like the previous procedure, it will sets the ending state, it will sets the return code and updates the MEF_UNIT_LOT table, but ends, with the RAISE_APPLICATION_ERROR, in a definitive way, the running job.
p_init_unit
This procedure has the only task to initialize all global variables, extracting the information from the MEF_UNIT_CFT table basing on parameters received in input.
The UNIT_TEST package
We are now able to start our tests. To this end, we will build the UNIT_TEST package which will contain a small number of processing units. Its code is only for demonstration. Note, that each processing unit has the exception according to the standard described above, ie using the SQL statement:
when others then mef_unit.p_exc (v_module_cod, sqlerrm);
create or replace package unit_test is
procedure module1;
procedure module2;
procedure module3;
procedure module_a;
procedure module_w;
procedure module_w2;
end;
/
sho errors
create or replace package body unit_test is
pv_pkg varchar2(30) := 'unit_test.';
procedure module1 as
v_module_cod varchar2(60) := pv_pkg||'module1';
v_num number;
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module2 as
v_module_cod varchar2(60) := pv_pkg||'module2';
v_num number;
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module3 as
v_module_cod varchar2(60) := pv_pkg||'module3';
v_num number;
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module_a as
v_module_cod varchar2(60) := pv_pkg||'module_a';
--v_gg date;
begin
execute immediate 'delete pippo';
--v_gg := to_date('20100140','yyyymmdd');
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module_w as
v_module_cod varchar2(60) := pv_pkg||'module_w';
v_num number(1);
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module_w2 as
v_module_cod varchar2(60) := pv_pkg||'module_w2';
v_gg date;
begin
v_gg := to_date('20100140','yyyymmdd');
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
end;
/
sho errors
procedure module1;
procedure module2;
procedure module3;
procedure module_a;
procedure module_w;
procedure module_w2;
end;
/
sho errors
create or replace package body unit_test is
pv_pkg varchar2(30) := 'unit_test.';
procedure module1 as
v_module_cod varchar2(60) := pv_pkg||'module1';
v_num number;
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module2 as
v_module_cod varchar2(60) := pv_pkg||'module2';
v_num number;
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module3 as
v_module_cod varchar2(60) := pv_pkg||'module3';
v_num number;
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module_a as
v_module_cod varchar2(60) := pv_pkg||'module_a';
--v_gg date;
begin
execute immediate 'delete pippo';
--v_gg := to_date('20100140','yyyymmdd');
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module_w as
v_module_cod varchar2(60) := pv_pkg||'module_w';
v_num number(1);
begin
select count(*) into v_num from all_objects;
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
procedure module_w2 as
v_module_cod varchar2(60) := pv_pkg||'module_w2';
v_gg date;
begin
v_gg := to_date('20100140','yyyymmdd');
exception
when others then mef_unit.p_exc(v_module_cod,sqlerrm);
end;
end;
/
sho errors
We see the functionality of these modules:
- MODULE1, MODULE2, MODULE3: They are procedures that finish without problems. They set into a local variable, the number of rows of an Oracle system table.This select was chosen to occupy a little time, and allows us to check the correctness of the information thunderstorms in the MEF_UNIT_LOT table .
- MODULE_W, MODULE_W2: These procedures have some instructions that force an error, that, using the flag of continuity, is non-blocking.
- MODULE_A: This procedure will end in failure, as the number of rows in the table is surely greater than 1 digit ,which is the constraint associated with the v_num local variable. As the flag of continuity says, the error is blocking. Remember, in your own tests, to exit and enter from SQL, between a test and the other, to reset the package variables.In addition, you must place a lot of attention to the names of units: If you execute the p_start of the X unit and you start the Y unit, the elaboration log will not be reliable.
Installation
I remember that all the code of the control system of units, can be downloaded from MEF_01:
https://drive.google.com/folderview?id=0B2dQ0EtjqAOTN3I1MU9JQmpOUEE&usp=sharing
Before you use it, you must install the base of the Micro ETL Foundation, that is the messaging system. This is the link to MEF_00:
https://drive.google.com/folderview?id=0B2dQ0EtjqAOTaU5WNmc5MkVnVFE&usp=sharing
Its installation is explained on slideshare:
http://www.slideshare.net/jackbim/recipe-7-of-data-warehouse-a-messaging-system-for-oracle-dwh-2
spool mef_unit_install
@mef_unit_ddl.sql
@mef_unit_pkg.sql
@unit_test_pkg.sql
spool off
@mef_unit_ddl.sql
@mef_unit_pkg.sql
@unit_test_pkg.sql
spool off
Regarding the control system of the units, do this. You must go in SQL*Plus with the user you created/ configured in the messaging system. Then run:
SQL> @ mef_unit_install.sql
You do not need to do anything else. We're ready for the test phase.
Test1 (it is all right)
We enter into SQL*Plus, and we launch the unit_test_run1.sql SQL script. The script is very simple.
delete mef_unit_cft where job_cod='job1';
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module2', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module3', 0);
commit;
begin
mef.pv_exec_cnt := 1;
mef_unit.p_start('s1','job1','unit_test.module1');
unit_test.module1;
mef_unit.p_end;
mef_unit.p_start('s1','job1','unit_test.module2');
unit_test.module2;
mef_unit.p_end;
mef_unit.p_start('s1','job1','unit_test.module3');
unit_test.module3;
mef_unit.p_end;
end;
/
sho errors
exit;
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module2', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module3', 0);
commit;
begin
mef.pv_exec_cnt := 1;
mef_unit.p_start('s1','job1','unit_test.module1');
unit_test.module1;
mef_unit.p_end;
mef_unit.p_start('s1','job1','unit_test.module2');
unit_test.module2;
mef_unit.p_end;
mef_unit.p_start('s1','job1','unit_test.module3');
unit_test.module3;
mef_unit.p_end;
end;
/
sho errors
exit;
First of all, it configures the units of the testing job, by inserting a row for units, into the MEF_UNIT_CFT table. So then runs the three units that surely will end with a positive outcome (it only takes about 30 seconds). As you can see, each unit is limited by the start/end pair.
Now we can verify the result, by seeing the contents of the MEF_UNIT_LOT table. You can enter into SQL*Plus and run the select of the table, but for ease of viewing, I will show the result (reduced) in graphical format.
Test2 (not-blocking errors)
In this second test, we show the behavior of the control system, in case of non-blocking errors. We go into SQL*Plus and launch the unit_test_run2.sql script.
delete mef_unit_cft where job_cod='job2';
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module_w', 1);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module_w2', 1);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module3', 0);
commit;
begin
mef.pv_exec_cnt := 2;
mef_unit.p_start('s1','job2','unit_test.module1');
unit_test.module1;
mef_unit.p_end;
mef_unit.p_start('s1','job2','unit_test.module_w');
unit_test.module_w;
mef_unit.p_end;
mef_unit.p_start('s1','job2','unit_test.module_w2');
unit_test.module_w2;
mef_unit.p_end;
mef_unit.p_start('s1','job2','unit_test.module3');
unit_test.module3;
mef_unit.p_end;
end;
/
exit;
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module_w', 1);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module_w2', 1);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module3', 0);
commit;
begin
mef.pv_exec_cnt := 2;
mef_unit.p_start('s1','job2','unit_test.module1');
unit_test.module1;
mef_unit.p_end;
mef_unit.p_start('s1','job2','unit_test.module_w');
unit_test.module_w;
mef_unit.p_end;
mef_unit.p_start('s1','job2','unit_test.module_w2');
unit_test.module_w2;
mef_unit.p_end;
mef_unit.p_start('s1','job2','unit_test.module3');
unit_test.module3;
mef_unit.p_end;
end;
/
exit;
The script is similar to the previous one, changing only the calls and the setting of the continuity flag of the unit. The final result obtained, clearly shows the errors encountered at run-time from the job2 and the fact that, not being blockers, module_w2 and module3 led to term, with successful, their executions.
Test3 (fatal error)
In this third test, we show the behavior of the control system in case of fatal error. We go into SQL*Plus and we launch the unit_test_run3.sql script.
delete MEF_UNIT_CFT where JOB_COD='job3';
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module_a', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module2', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module3', 0);
commit;
begin
mef.pv_exec_cnt := 3;
mef_unit.p_start('s1','job3','unit_test.module1');
unit_test.module1;
mef_unit.p_end;
mef_unit.p_start('s1','job3','unit_test.module_a');
unit_test.module_a;
mef_unit.p_end;
mef_unit.p_start('s1','job3','unit_test.module2');
unit_test.module2;
mef_unit.p_end;
mef_unit.p_start('s1','job3','unit_test.module3');
unit_test.module3;
mef_unit.p_end;
end;
/
exit;
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module_a', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module2', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module3', 0);
commit;
begin
mef.pv_exec_cnt := 3;
mef_unit.p_start('s1','job3','unit_test.module1');
unit_test.module1;
mef_unit.p_end;
mef_unit.p_start('s1','job3','unit_test.module_a');
unit_test.module_a;
mef_unit.p_end;
mef_unit.p_start('s1','job3','unit_test.module2');
unit_test.module2;
mef_unit.p_end;
mef_unit.p_start('s1','job3','unit_test.module3');
unit_test.module3;
mef_unit.p_end;
end;
/
exit;
The final result obtained, clearly shows how the module_a of the job3, having configured the continue_flg = 0, prevents the execution of the subsequent module2 and module3 units. Obviously such exception situations were inserted automatically also in the log messages table.
Conclusions
As the messaging system, also the control system of the processing units is simple and very useful, to answer all those questions that, inevitably, we are asked in the case of problems with the loading process of the Data Warehouse. Its simplicity is based on the fact that only three steps are sufficient in an already existing code:
1. Configure the unit
2. Insert the p_start and the p_end procedure call.
3. Replace the exception-handler with the p_exc call.
Soon, we will also see some scheduling techniques.That is, based on the configuration table here described, we will launch a loading job without care to insert the p_start and p_end calls.
It will be all automatical and dynamic. We will not have a separate main program for each job. And we will get what I theorized in the past in an my article that appeared, years ago, for Data Mart Review.
[The Infrastructural Data Warehouse]
(unfortunately the site no longer exists and has been incorporated into the Information Management site).
We will have implemented a method that, in my opinion, it is essential for an ETL process: get the clear separationbetween infrastructure code and business code of a Data Warehouse.
No comments:
Post a Comment