python如何使用100多行代碼寫一個數(shù)據(jù)庫-創(chuàng)新互聯(lián)

創(chuàng)新互聯(lián)www.cdcxhl.cn八線動態(tài)BGP香港云服務(wù)器提供商,新人活動買多久送多久,劃算不套路!

超過十余年行業(yè)經(jīng)驗(yàn),技術(shù)領(lǐng)先,服務(wù)至上的經(jīng)營模式,全靠網(wǎng)絡(luò)和口碑獲得客戶,為自己降低成本,也就是為客戶降低成本。到目前業(yè)務(wù)范圍包括了:網(wǎng)站設(shè)計(jì)制作、成都做網(wǎng)站,成都網(wǎng)站推廣,成都網(wǎng)站優(yōu)化,整體網(wǎng)絡(luò)托管,小程序定制開發(fā),微信開發(fā),app軟件開發(fā),同時也可以讓客戶的網(wǎng)站和網(wǎng)絡(luò)營銷和我們一樣獲得訂單和生意!

這篇文章主要介紹了python如何使用100多行代碼寫一個數(shù)據(jù)庫,具有一定借鑒價值,需要的朋友可以參考下。希望大家閱讀完這篇文章后大有收獲。下面讓小編帶著大家一起了解一下。

數(shù)據(jù)庫的名字叫WawaDB,是用python實(shí)現(xiàn)的。由此可見python是灰常強(qiáng)大啊!

簡介

記錄日志的需求一般是這樣的:

只追加,不修改,寫入按時間順序?qū)懭耄?/p>

大量寫,少量讀,查詢一般查詢一個時間段的數(shù)據(jù);

MongoDB的固定集合很好的滿足了這個需求,但是MongoDB占內(nèi)存比較大,有點(diǎn)兒火穿蚊子,小題大做的感覺。

WawaDB的思路是每寫入1000條日志,在一個索引文件里記錄下當(dāng)前的時間和日志文件的偏移量。

然后按時間詢?nèi)罩緯r,先把索引加載到內(nèi)存中,用二分法查出時間點(diǎn)的偏移量,再打開日志文件seek到指定位置,這樣就能很快定位用戶需要的數(shù)據(jù)并讀取,而不需要遍歷整個日志文件。

性能

Core 2 P8400,2.26GHZ,2G內(nèi)存,32 bit win7

寫入測試:

模擬1分鐘寫入10000條數(shù)據(jù),共寫入5個小時的數(shù)據(jù), 插入300萬條數(shù)據(jù),每條數(shù)據(jù)54個字符,用時2分51秒

讀取測試:讀取指定時間段內(nèi)包含某個子串的日志

數(shù)據(jù)范圍 遍歷數(shù)據(jù)量 結(jié)果數(shù) 用時(秒)

5小時 300萬 604 6.6

2小時 120萬 225 2.7

1小時 60萬 96 1.3

30分鐘 30萬 44 0.6

索引

只對日志記錄的時間做索引, 簡介里大概說了下索引的實(shí)現(xiàn),二分查找肯定沒B Tree效率高,但一般情況下也差不了一個數(shù)量級,而且實(shí)現(xiàn)特別簡單。

因?yàn)槭窍∈杷饕⒉皇敲織l日志都有索引記錄它的偏移量,所以讀取數(shù)據(jù)時要往前多讀一些數(shù)據(jù),防止漏讀,等讀到真正所需的數(shù)據(jù)時再真正給用戶返回數(shù)據(jù)。

如下圖,比如用戶要讀取25到43的日志,用二分法找25,找到的是30所在的點(diǎn),

索引:0         10        20        30        40        50 日志:|.........|.........|.........|.........|.........|>>>a = [0, 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>>>bisect.bisect_left(a, 43)>>>5>>>a[5]>>>50

所以我們要往前倒一些,從20(30的前一個刻度)開始讀取日志,21,22,23,24讀取后因?yàn)楸?5小,所以扔掉, 讀到25,26,27,...后返回給用戶

讀取到40(50的前一個刻度)后就要判斷當(dāng)前數(shù)據(jù)是否大于43了,如果大于43(返回全開區(qū)間的數(shù)據(jù)),就要停止讀了。

整體下來我們只操作了大文件的很少一部分就得到了用戶想要的數(shù)據(jù)。

緩沖區(qū)

為了減少寫入日志時大量的磁盤寫,索引在append日志時,把buffer設(shè)置成了10k,系統(tǒng)默認(rèn)應(yīng)該是4k。

同理,為了提高讀取日志的效率,讀取的buffer也設(shè)置了10k,也需要根據(jù)你日志的大小做適當(dāng)調(diào)整。

索引的讀寫設(shè)置成了行buffer,每滿一行都要flush到磁盤上,防止讀到不完整的索引行(其實(shí)實(shí)踐證明,設(shè)置了行buffer,還是能讀到半拉的行)。

查詢

啥?要支持SQL,別鬧了,100行代碼怎么支持SQL呀。

現(xiàn)在查詢是直接傳入一個lambada表達(dá)式,系統(tǒng)遍歷指定時間范圍內(nèi)的數(shù)據(jù)行時,滿足用戶的lambada條件才會返回給用戶。

當(dāng)然這樣會多讀取很多用戶不需要的數(shù)據(jù),而且每行都要進(jìn)行l(wèi)ambda表達(dá)式的運(yùn)算,不過沒辦法,簡單就是美呀。

以前我是把一個需要查詢的條件和日志時間,日志文件偏移量都記錄在索引里,這樣從索引里查找出符合條件的偏移量,然后每條數(shù)據(jù)都如日志文件里seek一次,read一次。這樣好處只有一個,就是讀取的數(shù)據(jù)量少了,但缺點(diǎn)有兩個:

索引文件特別大,不方便加載到內(nèi)存中

每次讀取都要先seek,貌似緩沖區(qū)用不上,特別慢,比連續(xù)讀一個段的數(shù)據(jù),并用lambda過濾慢四五倍

寫入

前面說過了,只append,不修改數(shù)據(jù),而且每行日志最前面是時間戳。

多線程

查詢數(shù)據(jù),可以多線程同時查詢,每次查詢都會打開一個新的日志文件的描述符,所以并行的多個讀取不會打架。

寫入的話,雖然只是append操作,但不確認(rèn)多線程對文件進(jìn)行append操作是否安全,所以建議用一個隊(duì)列,一個專用線程進(jìn)行寫入。

沒有任何鎖。

排序

默認(rèn)查詢出來的數(shù)據(jù)是按時間正序排列,如需其它排序,可取到內(nèi)存后用python的sorted函數(shù)排序,想怎么排就怎么排。

# -*- coding:utf-8 -*-
import os
import time
import bisect
import itertools
from datetimeimport datetime
import logging
  
default_data_dir= './data/'
default_write_buffer_size= 1024*10
default_read_buffer_size= 1024*10
default_index_interval= 1000
  
def ensure_data_dir():
    if not os.path.exists(default_data_dir):
        os.makedirs(default_data_dir)
  
def init():
    ensure_data_dir()
  
class WawaIndex:
    def __init__(self, index_name):
        self.fp_index= open(os.path.join(default_data_dir, index_name+ '.index'),'a+',1)
        self.indexes,self.offsets,self.index_count= [], [],0
        self.__load_index()
  
    def __update_index(self, key, offset):
        self.indexes.append(key)
        self.offsets.append(offset)
  
    def __load_index(self):
        self.fp_index.seek(0)
        for linein self.fp_index:
            try:
                key, offset = line.split()
                self.__update_index(key, offset)
            except ValueError:# 索引如果沒有flush的話,可能讀到有半行的數(shù)據(jù)
                pass
  
    def append_index(self, key, offset):
        self.index_count+= 1
        if self.index_count% default_index_interval== 0:
            self.__update_index(key, offset)
            self.fp_index.write('%s %s %s' % (key, offset, os.linesep))
  
    def get_offsets(self, begin_key, end_key):
        left= bisect.bisect_left(self.indexes,str(begin_key))
        right= bisect.bisect_left(self.indexes,str(end_key))
        left, right= left- 1, right- 1
        if left <0: left= 0
        if right <0: right= 0
        if right >len(self.indexes)- 1: right= len(self.indexes)- 1
        logging.debug('get_index_range:%s %s %s %s %s %s',self.indexes[0],self.indexes[-1], begin_key, end_key, left, right)
        return self.offsets[left],self.offsets[right]
  
  
class WawaDB:
    def __init__(self, db_name):
        self.db_name= db_name
        self.fp_data_for_append= open(os.path.join(default_data_dir, db_name+ '.db'),'a', default_write_buffer_size)
        self.index= WawaIndex(db_name)
  
    def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset):
        fp_data= open(os.path.join(default_data_dir,self.db_name+ '.db'),'r', default_read_buffer_size)
        fp_data.seek(int(begin_offset))
          
        line= fp_data.readline()
        find_real_begin_offset= False
        will_read_len, read_len= int(end_offset)- int(begin_offset),0
        while line:
            read_len+= len(line)
            if (not find_real_begin_offset)and  (line <str(begin_key)):
                line= fp_data.readline()
                continue
            find_real_begin_offset= True
            if (read_len >= will_read_len)and (line >str(end_key)):break
            yield line.rstrip('\r\n')
            line= fp_data.readline()
  
    def append_data(self, data, record_time=datetime.now()):
        def check_args():
            if not data:
                raise ValueError('data is null')
            if not isinstance(data,basestring):
                raise ValueError('data is not string')
            if data.find('\r') != -1 or data.find('\n') != -1:
                raise ValueError('data contains linesep')
  
        check_args()
          
        record_time= time.mktime(record_time.timetuple())
        data= '%s %s %s' % (record_time, data, os.linesep)
        offset= self.fp_data_for_append.tell()
        self.fp_data_for_append.write(data)
        self.index.append_index(record_time, offset)
  
    def get_data(self, begin_time, end_time, data_filter=None):
        def check_args():
            if not (isinstance(begin_time, datetime)and isinstance(end_time, datetime)):
                raise ValueError('begin_time or end_time is not datetime')
  
        check_args()
  
        begin_time, end_time= time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple())
        begin_offset, end_offset= self.index.get_offsets(begin_time, end_time)
  
        for datain self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset):
            if data_filter:
                if data_filter(data):
                    yield data
            else:
                yield data
  
def test():
    from datetimeimport datetime, timedelta
    import uuid, random
    logging.getLogger().setLevel(logging.NOTSET)
  
    def time_test(test_name):
        def inner(f):
            def inner2(*args,**kargs):
                start_time= datetime.now()
                result= f(*args,**kargs)
                print '%s take time:%s' % (test_name, (datetime.now()- start_time))
                return result
            return inner2
        return inner
  
    @time_test('gen_test_data')   
    def gen_test_data(db):
        now= datetime.now()
        begin_time= now- timedelta(hours=5)
        while begin_time < now:
            print begin_time
            for iin range(10000):
                db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time)
            begin_time+= timedelta(minutes=1)
      
    @time_test('test_get_data')   
    def test_get_data(db):
        begin_time= datetime.now()- timedelta(hours=3)
        end_time= begin_time+ timedelta(minutes=120)
        results= list(db.get_data(begin_time, end_time,lambda x: x.find('1024') != -1))
        print 'test_get_data get %s results' % len(results)
  
    @time_test('get_db')   
    def get_db():
        return WawaDB('test')
  
    if not os.path.exists('./data/test.db'):
        db= get_db()
        gen_test_data(db)
        #db.index.fp_index.flush()
    
    db= get_db()
    test_get_data(db)
  
init()
  
if __name__== '__main__':
    test()

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享python如何使用100多行代碼寫一個數(shù)據(jù)庫內(nèi)容對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道,遇到問題就找創(chuàng)新互聯(lián),詳細(xì)的解決方法等著你來學(xué)習(xí)!

當(dāng)前名稱:python如何使用100多行代碼寫一個數(shù)據(jù)庫-創(chuàng)新互聯(lián)
本文地址:http://bm7419.com/article44/ijsee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動態(tài)網(wǎng)站、企業(yè)網(wǎng)站制作、網(wǎng)站設(shè)計(jì)公司響應(yīng)式網(wǎng)站、網(wǎng)頁設(shè)計(jì)公司手機(jī)網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)