python多线程查询oracle 重点 建立多个conn

python多线程查询oracle 重点 建立多个conn,第1张

可运行的代码,end部分需要优化??

    def download_material_info2(param):
        print('begin download_material_info')
        # sql_conn = Oracle(JndiNames.EPO_DS)
        try:
            sql_conn = Oracle(JndiNames.EPO_DS)
            param["buPlant"] = "1"
            if not param['bu'] and not param['materialSource']:
                param["buPlant"] = ""
            # if '%' in param['specs']:
            #     param["specsLike"] = param["specs"]
            #     param["specs"] = ""
            if '%' in param['makerNo']:
                param["makerNoLIKE"] = param["makerNo"]
                param["makerNo"] = ""
            if '%' in param['makerName']:
                param["makerNameLIKE"] = param["makerName"]
                param["makerName"] = ""
            if '%' in param['hhNo']:
                param["hhNoLIKE"] = param["hhNo"]
                param["hhNo"] = ""
            if param['invalid'] == "invalid" and len(param["sourcer"]) == 0:
                param["sourcerInvalid"] = "1"

            t1 = time.time()
            rsp_info_count = DalMaterialInfo(sql_conn).download_material_info_count(param)
            rsp_info_count = rsp_info_count[0][0]
            print('rsp_info_count', rsp_info_count)
            l = []
            step = 7000
            for i in range(0, rsp_info_count, step):
                l.append(i)
            # print(l)
            print(len(l))

            start = 0
            end = 5
            #多线程 多进程查询SQL
            # rsp_info = DalMaterialInfo(sql_conn).download_material_info_start_end(param, start, end)

            class MyThread(Thread):
                def __init__(self, func, *args):  # 根据自身需求,将需要传出返回值的方法(func)和参数(args)传入,然后进行初始化
                    # super(MyThread, self).__init__()    # 对父类属性初始化
                    Thread.__init__(self)  # 这样写也可以,目的就是为了初始化父类属性
                    self.func = func
                    self.args = args

                # 重写run方法进行 *** 作运算
                def run(self):
                    self.result = self.func(*self.args)  # 将参数(args)传入方法(func)中进行运算,并得到最终结果(result)

                # 构造get_result方法传出返回值
                def get_result(self):
                    try:
                        return self.result  # 将结果return
                    except Exception:
                        return None



            # l = [0, 5000, 10000]

            threads = []
            info_list = []
            for index, value in enumerate(l):
                start = value
                end = value + step
                sql_conn = Oracle(JndiNames.EPO_DS)
                t = MyThread(DalMaterialInfo(sql_conn).download_material_info_start_end, param, start, end, sql_conn)
                t.start()
                threads.append(t)

            for index, t in enumerate(threads):
                info_dict = {}
                t.join()
                info_dict[f'{index}'] = t.get_result()
                # print('info dict', info_dict)
                info_list.append(info_dict)
            # print('info_list',info_list)
            print('len info list', len(info_list))

            #按照字典key排序
            def fun(d):
                for k,v in d.items():
                    return k

            #获取字典中的所有value
            def get_value(d):
                l = []
                for k, v in d.items():
                    # print('v', v)
                    # print('len v', len(v))
                    for t in v:
                        l.append(t)
                return l
            info_list.sort(key=fun)
            l = [get_value(x) for x in info_list]
            # print(info_list)
            result = []
            [result.extend(x) for x in l]
            print('len result', len(result))
            # print(len(rsp_info) ,rsp_info)
            t2 = time.time()
            print('get rsp info last', t2 - t1)
            time.sleep(1000)
            #原始查询
            # rsp_info = DalMaterialInfo(sql_conn).download_material_info(param)
            # print('rsp info', str(rsp_info)[:200])

            csv_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + '.csv'
            xlsx_name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + '.xlsx'
            download_dir = r'E:\code\pypy3.9\gscm_pypy\apps\download'
            csv_path = os.path.join(download_dir, csv_name)
            xlsx_path = os.path.join(download_dir, xlsx_name)
            try:
                with open(csv_path, "w", encoding='utf_8_sig', newline='') as fp:
                    writer = csv.writer(fp, delimiter=",")
                    # writer.writerow(["your", "header", "foo"])  # write header
                    writer.writerows(rsp_info)
            except:
                pass

            t6 = time.time()
            print('write to csv', t6 - t2)

            # head_info = get_excel_head("ppm_and_factory_download", "download")
            # if not len(rsp_info) > 0:
            #     raise Exception('No Data!')
            # base_path = head_info.path
            # 5 定义excel
            t3 = time.time()
            head_info, workbook, worksheet, filename = create_workbook_sheet('material_info_download')
            t7 = time.time()
            print('get head info', t7 - t3, head_info, workbook, worksheet, filename)

            # xlsx_path = os.path.join(download_dir, filename)
            pyexe_path = r'E:\code\pypy3.9\pypy.exe'
            py_path = r'E:\code\pypy3.9\gscm_pypy\apps\basic_information_mgt\view\t3.py'
            a0 = "--csv_path"
            a1 = "--xlsx_path"

            p = subprocess.Popen([pyexe_path, py_path, a0,csv_path, a1, xlsx_path], shell=True,
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT, encoding='utf-8')

            out = p.communicate()[0]
            print('out-->', out)
            print(type(out))
            workbook.close()
            # 返回下载路径
            begin_path = get_download_beginning_by_sys_envir_new()
            download_url = begin_path + head_info.response_path + filename
            db.session.close()
            sql_conn.close()
            print('download_url', download_url)
            t4 = time.time()
            print('generate excel last', t4 - t3)
            print('total last', t4 - t1)
            return download_url
        except Exception as e:
            sql_conn.close()
            db.session.close()
            raise Exception("Download failed!" + str(e))

官方参考

import cx_Oracle
import threading
from urllib import urlopen

#subclass of threading.Thread
class AsyncBlobInsert(threading.Thread):
  def __init__(self, cur, input):
    threading.Thread.__init__(self)
    self.cur = cur
    self.input = input
  def run(self):
    blobdoc = self.input.read()
    self.cur.execute("INSERT INTO blob_tab (ID, BLOBDOC) VALUES(blob_seq.NEXTVAL, :blobdoc)", {'blobdoc':blobdoc})
    self.input.close()
    self.cur.close()
#main thread starts here
inputs = []
inputs.append(open('/tmp/figure1.bmp', 'rb'))
inputs.append(urlopen('http://localhost/_figure2.bmp', 'rb'))
dbconn = cx_Oracle.connect('usr', 'pswd', '127.0.0.1/XE',threaded=True)
dbconn.autocommit = True
for input in inputs:
   cur = dbconn.cursor()
   cur.setinputsizes(blobdoc=cx_Oracle.BLOB)
   th = AsyncBlobInsert(cur, input)
   th.start()

https://www.oracle.com/technical-resources/articles/embedded/vasiliev-python-concurrency.html

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/langs/718361.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存