并行管道函数
这个例子中要使用两个表:T1和T2。T1是先读的表,T2表用来插入这个信息。我们要用的两个表如下:
sys@JINGYONG> create table t1 2 as 3 select object_id id,object_name text 4 from all_objects; 表已创建。 sys@JINGYONG> begin 2 dbms_stats.set_table_stats 3 (user,'T1',numrows=>100000000,numblks=>100000); 4 end; 5 / PL/SQL 过程已成功完成。 sys@JINGYONG> create table t2 2 as 3 select t1.*,0 session_id 4 from t1 5 where 1=0; 表已创建。
这里使用DBMS_STATS来骗过优化器,让它以为输入表中有10,000,000行,而且占用了100,000个数据库块。在此模拟 一个大表。第二个表T2是第一个表的一个副本,只是在结构中增加了一个SESSION_ID列。可以通过它具体看到是否发生了并行化。接下来,需要建立管道函数返回的对象类型。在这个例子中,对象类型类似于T2:
sys@JINGYONG> create or replace type t2_type 2 as object 3 ( 4 id number, 5 text varchar2(30), 6 session_id number 7 ); 8 / 类型已创建。 sys@JINGYONG> create or replace type t2_tab_type as table of t2_type; 2 / 类型已创建。
现在这个过程是一个生成行的函数。它接收数据作为输入,并在一个引用游标(ref cursor)中处理。这个函数返回一个 T2_TAB_TYPE,这就是我们刚才创建的对象类型。这是一个PARALLEL_ENABLED(启用子并行)的管道函数。在此使用了分区 (partition)子句,这就告诉Oracle:以任何最合适的方式划分或分解数据。我们不需要对数据的顺序做任何假设。
在此,我们只想划分数据。数据如何划分对于我们的处理并不重要,所以定义如下:
sys@JINGYONG> create or replace function parallel_pipelined(l_cursor in sys_refcursor) 2 return t2_tab_type 3 pipelined 4 parallel_enable(partition l_cursor by any) 5 is 6 l_session_id number; 7 TYPE type_t1_data IS TABLE OF t1%ROWTYPE INDEX BY PLS_INTEGER; 8 l_t1 type_t1_data; 9 10 begin 11 select sid into l_session_id 12 from v$mystat 13 where rownum=1; 14 loop 15 fetch l_cursor bulk collect into l_t1;--用bulk collect来一次性获取数据 16 exit when l_t1.count=0; 17 for i in 1 .. l_t1.count loop 18 pipe row(t2_type(l_t1(i).id,l_t1(i).text,l_session_id)); 19 end loop; 20 null; 21 end loop; 22 close l_cursor; 23 return; 24 end; 25 / Function created
或者用下面的过程来一行一行来获取
create or replace function parallel_pipelined( l_cursor in sys_refcursor ) return t2_tab_type pipelined parallel_enable ( partition l_cursor by any ) is l_session_id number; l_rec t1%rowtype; begin select sid into l_session_id from v$mystat where rownum =1; loop fetch l_cursor into l_rec; exit when l_cursor%notfound; pipe row(t2_type(l_rec.id,l_rec.text,l_session_id)); end loop; close l_cursor; return; end;
这样就创建了函数。我们准备并行地处理数据,让Oracle根据可用的资源来确定最合适的并行度:
SQL> insert /*+ append */ 2 into t2(id,text,session_id) 3 select * 4 from table(parallel_pipelined 5 (CURSOR(select /*+ parallel(t1) */ * 6 from t1 ) 7 )) 8 ; 50333 rows inserted SQL> commit; Commit complete
为了查看这里发生了什么,可以查询新插入的数据,并按SESSION_ID分组,先来看使用了多少个并行执行服务器,再看每个并行 执行服务器处理了多少行:
SQL> select session_id,count(*) from t2 group by session_id; SESSION_ID COUNT(*) ---------- ---------- 136 31006 145 19327
显然,对于这个并行操作的SELECT部分,我们使用了2个并行执行服务器,可以看到,Oracle对我们的过程进行了并行化