臺北市交通流量及特性(年度)調查數據檔案之讀取

 

背景

  • 雖然近年來臺北市交通流量及特性(年度)調查數據檔案也有.xlsx格式的副本,但是之前並沒有,因此還是決定以htm格式進行解析。
  • 經下載、更名後的年度_測點.htm檔案,並非每年都一樣,無法建立年度間的通案規則,只能將所有檔案名稱存在另一檔案中(sss.txt),逐一進行解析與轉檔。-下載過程可詳參[[2022-10-13-get_sheet]]
  • htm的解析使用bs4.BeautifulSoup(‘html.parser’)`模組
  • 程式碼詳見rd_sht3.py此處進行分段解說。

Input/Output

年度調查數據檔案名稱之讀入

  • 檔名之序列檔:sss.txt,範例如下,共3,051行
sht3_12_NI001.txt
sht3_12_NI002.txt
sht3_12_NI003.txt
...
sht3_9_SI121.htm
  • 從檔名中解析
    1. 年份,第一筆年代碼為民國90年,將其轉換為西元年(...+90+1911)
    2. 格式:
      • 如字尾是.htm,則使用bs4模組來讀取
      • 如字尾是.txt,就用字串一一解讀。
with open('sss.txt') as ftext:
    s=[line.split('\n')[0] for line in ftext]
...
for fname in s:
    yr=int(fname.split('_')[1])+90+1911
    if fname.split('.')[1]=='htm':
...
    else:
        with open(fname) as ftext:
            ln=[line.split() for line in ftext]
            if fname=='sht3_12_SI114.txt':

輸出檔案

  • 最後結果整理成csv檔案,並且輸出成'sht3_df.csv'
...
df=DataFrame(d)
for i in xrange(len(df)): #每列確認橫向綜合非為0,如果各個車種流量皆為0,則刪除該筆紀錄。
    sm=0.
    for j in nam_v:
        try:
            tt=float(df.loc[i,j])
        except:
            continue
        else:
            if tt>0:sm=sm+tt
    if sm==0.: df=df.drop(i)
cols=['year','time','dirn','name','road']+nam_v
df[cols].set_index('year').to_csv('sht3_df.csv')            

交通量之解析

使用bs4模組進行解析

  • a為每個方向各車種的交通量數據字串
  • 最多可能會有A~F等6個道路連結
...
    if fname.split('.')[1]=='htm':
        fn=open(fname,'r')
        soup = BeautifulSoup(fn,'html.parser')
        td=soup.find_all('td')
        a=[str(td[i]).split('>')[1].split('<')[0] for i in xrange(len(td))]

文字檔之讀取

  • 有些年度_測站組合並沒有.htm格式存檔,需將pdf檔案存成文字檔,再解析出a的內容,如同前面bs4解析結果一樣。
  • 2個特例之外、一般案例的解析
    1. 從檔案最末行開始讀起
    2. 從每行的最右邊開始讀起
    3. 解析每個叉路、雙向、各個車種的交通量字串序列a
      • 以試誤法每3個字串組合逐一嘗試,
      • 如果是國字,則為最左邊邊界,這之中的就是所要的數字
    else:
        with open(fname) as ftext:
            ln=[line.split() for line in ftext]
            if fname=='sht3_12_SI114.txt':ln.remove(ln[0])
            if fname!='sht3_13_NI028.txt':
                lnn=[ln[-1-i] for i in xrange(len(ln))]
                lna=[]
                for i in xrange(len(lnn)):
                    lna.append([lnn[i][-1-j] for j in xrange(len(lnn[i]))])
                ln=[]
                for i in xrange(len(lna)):
                    ln1=[]
                    for j in xrange(len(lna[i])):
                        string=lna[i][j]
                        sa=''
                        ibeg=len(string)-3
                        if ibeg >=0:
                            while True:
                                try:
                                    string[ibeg:ibeg+3].decode('utf-8')
                                except UnicodeError:
                                    sa=sa+string[ibeg+2]
                                    ibeg=ibeg-1
                                    if ibeg<0:break
                                else:
                                    if len(string[ibeg:ibeg+3].decode('utf-8'))==1:
                                        k=ibeg
                                        sa=sa+string[k]+string[k+1]+string[k+2]
                                    else:
                                        for k in xrange(ibeg+2,ibeg-1,-1):
                                            sa=sa+string[k] 
                                        if 0<ibeg<3:
                                            for k in xrange(ibeg-1,min(-1,ibeg-2),-1):
                                                sa=sa+string[k]
                                    ibeg=ibeg-3
                                    if ibeg<0:break
                        else:
                            for k in xrange(len(string)-1,-1,-1):
                               sa=sa+string[k] 
                        ln1.append(sa)
                    ln.append(ln1)
            a=list(itertools.chain(*ln))

各叉路、雙向、各車種交通量解析

  • 中文字轉成羅馬拼音
    b=[]
    for i in xrange(100):
        cha=a[i]
        if type(cha)==float:
            b.append(cha)
            continue
        if len(cha)<2:
            b.append(cha)
            continue
        ss=lazy_pinyin(cha.decode('utf8'))
        sss=''
        for j in ss:
            if j.isalnum():sss=sss+j
        b.append(sss)
  • 找到「站名」等標籤在序列b的位置,紀錄為road_s
note=['zhanming','zhan','shixiangshu'] #站名、站、時相數
...
    for nt in note:
        try:
            b_note=b.index(nt)
        except:
            road_s='not_found'
        else:
            road_s=b[b_note+1]
            break
  • 標定叉路數
def FindLastABC(end):
    for last in xrange(end,-1,-1):
        if a[last] in ABC:break
    return last
ABC=['A','B','C','D','E','F']
...
    last=FindLastABC(len(a)-1)
    A_last=a[last]
    lastend=len(a)
    if fname =='sht3_7_SI061.htm':A_last='D'
  • 記錄年代year、時間time、方向dirn、站名name、註記標籤road、以及各車種流量tabv
year,time,dirn,name,road=[],[],[],[],[]
v=[]
for iv in xrange(8):            
    v.append([])
...
    ampm='pm'
    for chr in xrange(12): #repeat for A~F twice
...
        tab=[]
        for j in xrange(last,lastend):
            try:
                tt=float(a[j])
            except:
                if len(a[j])>1 and a[j][-1]=='%':
                    tab.append(float(a[j][:-1])/100)
            else:
                tab.append(tt)
        year.append(yr)
        time.append(ampm)
        dirn.append(a[last])
        name.append((fname.split('_')[2]).split('.')[0])
        road.append(road_s)
        if len(tab) <8:
            for iv in xrange(8-len(tab)):
                tab.insert(0,0.0)
        len_tab=len(tab)
        for gt1 in xrange(1,10):
            tt=tab[len(tab)-gt1]
            if 0< tt and tt < 1:
                len_tab=len_tab-gt1+1
                break
        if tab[len_tab-1] > 1 or tab[len_tab-1]==0.: tab=[0.0 for x in tab]            
        for iv in xrange(len_tab-8,len_tab):            
            v[iv-(len_tab-8)].append(tab[iv])
        lastend=last
        last=FindLastABC(last-1)
        if a[last] == A_last and ampm=='am':break
        if a[last] == A_last and ampm=='pm':
            ampm='am'
  • 整合名稱及序列成為dict
nam_v=['Lvolume','Lratio','Mvolume','Mratio','Svolume','Sratio','PCU','PHF']
...
d={'year':year,'time':time,'dirn':dirn,'name':name,'road':road}
for iv in xrange(8):
    d.update({nam_v[iv]:v[iv]})

輸出檔案範例

  • 最後結果整理成csv檔案,並且輸出成'sht3_df.csv'
kuang@master /home/backup/data/ETC/TaipeiVD/htm
$ head sht3_df.csv
year,time,dirn,name,road,Lvolume,Lratio,Mvolume,Mratio,Svolume,Sratio,PCU,PHF
2013,pm,D,NI001,dazhiqiaobeianlu,0.0,0.0,92.0,0.28,238.0,0.72,197.0,0.73
2013,pm,C,NI001,dazhiqiaobeianlu,54.0,0.1,300.0,0.55,191.0,0.35,511.0,0.79
2013,pm,B,NI001,dazhiqiaobeianlu,14.0,0.0,1126.0,0.34,2167.0,0.66,2191.0,0.91